Skip to content

Edge development #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Apr 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7d2b87d
Extracted actor & channel into an "edge" gem.
jdantonio Mar 21, 2015
05b6e76
First draft IVar-Future-Promise-Probe unification
pitr-ch Oct 23, 2014
ae2b6a2
Adding Delay and benchmark of the synchronization
pitr-ch Oct 27, 2014
3dfd5c3
Various improvements
pitr-ch Nov 4, 2014
fc802e5
Allows to build promise trees with lazy evaluated branches
pitr-ch Nov 4, 2014
aa4bf37
Adding more documentation
pitr-ch Nov 6, 2014
42d62af
Add join method on instance, with alias :+
pitr-ch Nov 8, 2014
a14278a
Move #post method to different Shortcut module
pitr-ch Nov 10, 2014
3d62bdf
Adding doc for shortcut methods.
pitr-ch Nov 10, 2014
a12e866
Add Scheduled a Promise implementation replacing ScheduledTask.
pitr-ch Nov 10, 2014
0947cb6
Fix rescue behavior to be like Scala's Future#recover
pitr-ch Nov 12, 2014
6cffd19
First attempt to break classes down
pitr-ch Dec 3, 2014
7430ca0
Better break down of Promises to classes
pitr-ch Dec 5, 2014
29d2958
Readme updates
pitr-ch Apr 8, 2015
22e00b7
Simplify moving features between gems
pitr-ch Apr 8, 2015
916f613
Merge branch 'master' into edge-experimental-incubator
pitr-ch Apr 8, 2015
4a02498
Fix require paths
pitr-ch Apr 8, 2015
dc2bf4c
Merge branch 'edge-experimental-incubator' into devel
pitr-ch Apr 22, 2015
af23a18
Merge branch 'next' into devel
pitr-ch Apr 22, 2015
80b2e46
Add Concurrent.executor as an easy access point to global executors
pitr-ch Apr 9, 2015
84b1417
Synchronization and behavior separation
pitr-ch Apr 9, 2015
f92204a
Move the new Futures to Edge namespace
pitr-ch Apr 9, 2015
162b7c1
Use ns_wait_until
pitr-ch Apr 9, 2015
433559d
Separate synchronization and behavior in rest of the classes
pitr-ch Apr 9, 2015
b830e8f
Remove synchronization leaks
pitr-ch Apr 10, 2015
53425e8
Unify rspec configuration
pitr-ch Apr 12, 2015
11a3d3b
Updates to new synchronization module
pitr-ch Apr 22, 2015
29d0380
Merge branch 'master' into devel
pitr-ch Apr 23, 2015
8fd8bf2
Stabilize tests
pitr-ch Apr 23, 2015
701a6b6
Improvements
pitr-ch Apr 23, 2015
113fe07
Actor migrated to Edge::Future and integration added
pitr-ch Apr 23, 2015
0ac67e6
Merge branch 'master' into devel
pitr-ch Apr 24, 2015
4272db4
Improve tests
pitr-ch Apr 24, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ ext/**/*.bundle
ext/**/*.so
ext/**/*.jar
pkg
*.gem
4 changes: 3 additions & 1 deletion .rspec
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
--require spec_helper
--format progress
--color
--backtrace
--format documentation
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
source 'https://rubygems.org'

gemspec name: 'concurrent-ruby'
gemspec name: 'concurrent-ruby-edge'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was created the extension gem I ran into some problem with this. I can't remember what it was. That's why the extension gem isn't listed here. We'll need to take a look at this after we merge. Whatever it was may no longer be an issue.


group :development do
gem 'rake', '~> 10.4.2'
Expand Down
26 changes: 20 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,13 @@ This library contains a variety of concurrency abstractions at high and low leve

### High-level, general-purpose asynchronous concurrency abstractions

* [Actor](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Actor.html): Implements the Actor Model, where concurrent actors exchange messages.
* [Agent](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Agent.html): A single atomic value that represents an identity.
* [Async](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html): A mixin module that provides simple asynchronous behavior to any standard class/object or object.
* [Future](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Future.html): An asynchronous operation that produces a value.
* [Dataflow](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#dataflow-class_method): Built on Futures, Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
* [Promise](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html): Similar to Futures, with more features.
* [ScheduledTask](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ScheduledTask.html): Like a Future scheduled for a specific future time.
* [TimerTask](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/TimerTask.html): A Thread that periodically wakes up to perform work at regular intervals.
* [Channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Channel.html): Communicating Sequential Processes (CSP).

### Java-inspired ThreadPools and other executors

Expand Down Expand Up @@ -90,6 +88,19 @@ This library contains a variety of concurrency abstractions at high and low leve
* [Software transactional memory](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/TVar.html) (TVar)
* [ReadWriteLock](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ReadWriteLock.html)

### Edge features

They are available in the `concurrent-ruby-edge` companion gem, install with `gem install concurrent-ruby-edge`.

These features are under active development and may change frequently. They are expected not to
keep backward compatibility (there may also lack tests and documentation). Semantic versions will
be obeyed though. Features developed in `concurrent-ruby-edge` are expected to move to `concurrent-ruby` when final.

* [Actor](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Actor.html):
Implements the Actor Model, where concurrent actors exchange messages.
* [Channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Channel.html):
Communicating Sequential Processes (CSP).

## Usage

All abstractions within this gem can be loaded simply by requiring it:
Expand All @@ -105,9 +116,7 @@ require 'concurrent' # everything

# groups

require 'concurrent/actor' # Concurrent::Actor and supporting code
require 'concurrent/atomics' # atomic and thread synchronization classes
require 'concurrent/channels' # Concurrent::Channel and supporting code
require 'concurrent/executors' # Thread pools and other executors
require 'concurrent/utilities' # utility methods such as processor count and timers

Expand All @@ -127,6 +136,11 @@ require 'concurrent/promise' # Concurrent::Promise
require 'concurrent/scheduled_task' # Concurrent::ScheduledTask
require 'concurrent/timer_task' # Concurrent::TimerTask
require 'concurrent/tvar' # Concurrent::TVar

# experimental - available in `concurrent-ruby-edge` companion gem

require 'concurrent/actor' # Concurrent::Actor and supporting code
require 'concurrent/channel ' # Concurrent::Channel and supporting code
```

## Installation
Expand All @@ -147,8 +161,8 @@ and run `bundle install` from your shell.

Potential performance improvements may be achieved under MRI by installing optional C extensions.
To minimize installation errors the C extensions are available in the `concurrent-ruby-ext` extension
gem. The extension gem lists `concurrent-ruby` as a dependency so it is not necessary to install both.
Simply install the extension gen:
gem. `concurrent-ruby` and `concurrent-ruby-ext` are always released together with same version.
Simply install the extension gen too:

```ruby
gem install concurrent-ruby-ext
Expand Down
43 changes: 28 additions & 15 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
#!/usr/bin/env rake

require 'concurrent/version'
require 'concurrent/native_extensions'

## load the two gemspec files
CORE_GEMSPEC = Gem::Specification.load('concurrent-ruby.gemspec')
EXT_GEMSPEC = Gem::Specification.load('concurrent-ruby-ext.gemspec')
EDGE_GEMSPEC = Gem::Specification.load('concurrent-ruby-edge.gemspec')

## constants used for compile/build tasks

GEM_NAME = 'concurrent-ruby'
EXTENSION_NAME = 'extension'
EXT_NAME = 'extension'
EDGE_NAME = 'edge'
JAVA_EXT_NAME = 'concurrent_ruby_ext'

if Concurrent.on_jruby?
CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}-java.gem"
else
CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}.gem"
EXTENSION_GEM = "#{GEM_NAME}-ext-#{Concurrent::VERSION}.gem"
EXT_GEM = "#{GEM_NAME}-ext-#{Concurrent::VERSION}.gem"
NATIVE_GEM = "#{GEM_NAME}-ext-#{Concurrent::VERSION}-#{Gem::Platform.new(RUBY_PLATFORM)}.gem"
EDGE_GEM = "#{GEM_NAME}-edge-#{Concurrent::EDGE_VERSION}.gem"
end

## safely load all the rake tasks in the `tasks` directory
Expand Down Expand Up @@ -49,7 +53,7 @@ elsif Concurrent.allow_c_extensions?
## create the compile tasks for the extension gem
require 'rake/extensiontask'

Rake::ExtensionTask.new(EXTENSION_NAME, EXT_GEMSPEC) do |ext|
Rake::ExtensionTask.new(EXT_NAME, EXT_GEMSPEC) do |ext|
ext.ext_dir = 'ext/concurrent'
ext.lib_dir = 'lib/concurrent'
ext.source_pattern = '*.{c,h}'
Expand All @@ -63,9 +67,9 @@ elsif Concurrent.allow_c_extensions?
'x64-mingw32' => 'x86_64-w64-mingw32'
}
platforms.each do |platform, prefix|
task "copy:#{EXTENSION_NAME}:#{platform}:#{ruby_version}" do |t|
task "copy:#{EXT_NAME}:#{platform}:#{ruby_version}" do |t|
%w[lib tmp/#{platform}/stage/lib].each do |dir|
so_file = "#{dir}/#{ruby_version[/^\d+\.\d+/]}/#{EXTENSION_NAME}.so"
so_file = "#{dir}/#{ruby_version[/^\d+\.\d+/]}/#{EXT_NAME}.so"
if File.exists?(so_file)
sh "#{prefix}-strip -S #{so_file}"
end
Expand Down Expand Up @@ -94,7 +98,11 @@ end

namespace :build do

build_deps = [:clean]
task :mkdir_pkg do
mkdir_p 'pkg'
end

build_deps = [:clean, 'build:mkdir_pkg']
build_deps << :compile if Concurrent.on_jruby?

desc "Build #{CORE_GEM} into the pkg directory"
Expand All @@ -104,17 +112,24 @@ namespace :build do
end

unless Concurrent.on_jruby?
desc "Build #{EXTENSION_GEM} into the pkg directory"
task :ext => [:clean] do

desc "Build #{EDGE_GEM} into the pkg directory"
task :edge => 'build:mkdir_pkg' do
sh "gem build #{EDGE_GEMSPEC.name}.gemspec"
sh 'mv *.gem pkg/'
end

desc "Build #{EXT_GEM} into the pkg directory"
task :ext => build_deps do
sh "gem build #{EXT_GEMSPEC.name}.gemspec"
sh 'mv *.gem pkg/'
end
end

if Concurrent.allow_c_extensions?
desc "Build #{NATIVE_GEM} into the pkg directory"
task :native do
sh "gem compile pkg/#{EXTENSION_GEM}"
task :native => 'build:mkdir_pkg' do
sh "gem compile pkg/#{EXT_GEM}"
sh 'mv *.gem pkg/'
end
end
Expand All @@ -124,8 +139,8 @@ if Concurrent.on_jruby?
desc 'Build JRuby-specific core gem (alias for `build:core`)'
task :build => ['build:core']
else
desc 'Build core and extension gems'
task :build => ['build:core', 'build:ext']
desc 'Build core, extension, and edge gems'
task :build => ['build:core', 'build:ext', 'build:edge']
end

## the RSpec task that compiles extensions when available
Expand All @@ -134,9 +149,7 @@ begin
require 'rspec'
require 'rspec/core/rake_task'

RSpec::Core::RakeTask.new(:spec) do |t|
t.rspec_opts = '--color --backtrace --format documentation'
end
RSpec::Core::RakeTask.new(:spec)

task :default => [:clean, :compile, :spec]
rescue LoadError
Expand Down
31 changes: 31 additions & 0 deletions concurrent-ruby-edge.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
$:.push File.join(File.dirname(__FILE__), 'lib')

require 'concurrent/version'
require 'concurrent/file_map'

Gem::Specification.new do |s|
git_files = `git ls-files`.split("\n")

s.name = 'concurrent-ruby-edge'
s.version = Concurrent::EDGE_VERSION
s.platform = Gem::Platform::RUBY
s.authors = ["Jerry D'Antonio", 'The Ruby Concurrency Team']
s.email = ['[email protected]', '[email protected]']
s.homepage = 'http://www.concurrent-ruby.com'
s.summary = 'Edge features and additions to the concurrent-ruby gem.'
s.license = 'MIT'
s.date = Time.now.strftime('%Y-%m-%d')
s.files = Concurrent::FILE_MAP.fetch :edge
s.extra_rdoc_files = Dir['README*', 'LICENSE*']
s.require_paths = ['lib']
s.description = <<-TXT
These features are under active development and may change frequently. They are expected not to
keep backward compatibility (there may also lack tests and documentation). Semantic versions will
be obeyed though. Features developed in `concurrent-ruby-edge` are expected to move to `concurrent-ruby` when final.
Please see http://concurrent-ruby.com for more information.
TXT

s.required_ruby_version = '>= 1.9.3'

s.add_runtime_dependency 'concurrent-ruby', "~> #{Concurrent::VERSION}"
end
33 changes: 17 additions & 16 deletions concurrent-ruby.gemspec
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
$:.push File.join(File.dirname(__FILE__), 'lib')

require 'concurrent/version'
require 'concurrent/file_map'

Gem::Specification.new do |s|
s.name = 'concurrent-ruby'
s.version = Concurrent::VERSION
s.platform = Gem::Platform::RUBY
s.author = "Jerry D'Antonio"
s.email = '[email protected]'
s.homepage = 'http://www.concurrent-ruby.com'
s.summary = 'Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.'
s.license = 'MIT'
s.date = Time.now.strftime('%Y-%m-%d')
git_files = `git ls-files`.split("\n")

s.description = <<-EOF
Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more.
Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns.
EOF

s.files = Dir['lib/**/*.rb']
s.name = 'concurrent-ruby'
s.version = Concurrent::VERSION
s.platform = Gem::Platform::RUBY
s.authors = ["Jerry D'Antonio", 'The Ruby Concurrency Team']
s.email = ['[email protected]', '[email protected]']
s.homepage = 'http://www.concurrent-ruby.com'
s.summary = 'Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.'
s.license = 'MIT'
s.date = Time.now.strftime('%Y-%m-%d')
s.files = Concurrent::FILE_MAP.fetch :core
s.extra_rdoc_files = Dir['README*', 'LICENSE*', 'CHANGELOG*']
s.require_paths = ['lib']
s.description = <<-EOF
Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more.
Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns.
EOF

if defined?(JRUBY_VERSION)
s.files += Dir['lib/**/*.jar']
s.files += Dir['lib/**/*.jar']
s.platform = 'java'
else
s.add_runtime_dependency 'ref', '~> 1.0', '>= 1.0.5'
Expand Down
12 changes: 12 additions & 0 deletions doc/future-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Futures and Promises

New implementation added in version 0.8 differs from previous versions and has little in common.
{Future} represents a value which will become {#completed?} in future, it'll contain {#value} if {#success?} or a {#reason} if {#failed?}. It cannot be directly completed, there are implementations of abstract {Promise} class for that, so {Promise}'s only purpose is to complete a given {Future} object. They are always constructed as a Pair even in chaining methods like {#then}, {#rescue}, {#then_delay}, etc.

There is few {Promise} implementations:

- OuterPromise - only Promise used by users, can be completed by outer code. Constructed with {Concurrent::Next.promise} helper method.
- Immediate - internal implementation of Promise used to represent immediate evaluation of a block. Constructed with {Concurrent::Next.future} helper method.
- Delay - internal implementation of Promise used to represent delayed evaluation of a block. Constructed with {Concurrent::Next.delay} helper method.
- ConnectedPromise - used internally to support {Future#with_default_executor}

7 changes: 7 additions & 0 deletions lib/concurrent-edge.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require 'concurrent'

require 'concurrent/actor'
require 'concurrent/channel'
require 'concurrent/edge/future'


2 changes: 0 additions & 2 deletions lib/concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

require 'concurrent/configuration'

require 'concurrent/actor'
require 'concurrent/atomics'
require 'concurrent/channels'
require 'concurrent/collections'
require 'concurrent/errors'
require 'concurrent/executors'
Expand Down
11 changes: 5 additions & 6 deletions lib/concurrent/actor.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
require 'concurrent/configuration'
require 'concurrent/delay'
require 'concurrent/executor/serialized_execution'
require 'concurrent/ivar'
require 'concurrent/logging'
require 'concurrent/synchronization'
require 'concurrent/edge/future'

module Concurrent
# TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ?
Expand Down Expand Up @@ -40,9 +39,9 @@ def self.current
Thread.current[:__current_actor__]
end

@root = Delay.new do
Core.new(parent: nil, name: '/', class: Root, initialized: ivar = IVar.new).reference.tap do
ivar.no_error!
@root = Concurrent.delay do
Core.new(parent: nil, name: '/', class: Root, initialized: future = Concurrent.future).reference.tap do
future.wait!
end
end

Expand Down Expand Up @@ -77,7 +76,7 @@ def self.spawn(*args, &block)

# as {.spawn} but it'll raise when Actor not initialized properly
def self.spawn!(*args, &block)
spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! }
spawn(spawn_optionify(*args).merge(initialized: future = Concurrent.future), &block).tap { future.wait! }
end

# @overload spawn_optionify(context_class, name, *args)
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/actor/behaviour/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def broadcast(event)

def reject_envelope(envelope)
envelope.reject! ActorTerminated.new(reference)
dead_letter_routing << envelope unless envelope.ivar
dead_letter_routing << envelope unless envelope.future
log Logging::DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}"
end
end
Expand Down
8 changes: 4 additions & 4 deletions lib/concurrent/actor/behaviour/sets_results.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Concurrent
module Actor
module Behaviour
# Collects returning value and sets the IVar in the {Envelope} or error on failure.
# Collects returning value and sets the CompletableFuture in the {Envelope} or error on failure.
class SetResults < Abstract
attr_reader :error_strategy

Expand All @@ -12,8 +12,8 @@ def initialize(core, subsequent, error_strategy)

def on_envelope(envelope)
result = pass envelope
if result != MESSAGE_PROCESSED && !envelope.ivar.nil?
envelope.ivar.set result
if result != MESSAGE_PROCESSED && !envelope.future.nil?
envelope.future.success result
end
nil
rescue => error
Expand All @@ -28,7 +28,7 @@ def on_envelope(envelope)
else
raise
end
envelope.ivar.fail error unless envelope.ivar.nil?
envelope.future.fail error unless envelope.future.nil?
end
end
end
Expand Down
Loading