Skip to content

Synchronization layer #273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env rake

require_relative './lib/extension_helper'
require 'concurrent/native_extensions'

## load the two gemspec files
CORE_GEMSPEC = Gem::Specification.load('concurrent-ruby.gemspec')
Expand All @@ -12,7 +12,7 @@ GEM_NAME = 'concurrent-ruby'
EXTENSION_NAME = 'extension'
JAVA_EXT_NAME = 'concurrent_ruby_ext'

if Concurrent.jruby?
if Concurrent.on_jruby?
CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}-java.gem"
else
CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}.gem"
Expand All @@ -35,7 +35,7 @@ Dir.glob('tasks/**/*.rake').each do |rakefile|
safe_load rakefile
end

if Concurrent.jruby?
if Concurrent.on_jruby?

## create the compile task for the JRuby-specific gem
require 'rake/javaextensiontask'
Expand Down Expand Up @@ -95,15 +95,15 @@ end
namespace :build do

build_deps = [:clean]
build_deps << :compile if Concurrent.jruby?
build_deps << :compile if Concurrent.on_jruby?

desc "Build #{CORE_GEM} into the pkg directory"
task :core => build_deps do
sh "gem build #{CORE_GEMSPEC.name}.gemspec"
sh 'mv *.gem pkg/'
end

unless Concurrent.jruby?
unless Concurrent.on_jruby?
desc "Build #{EXTENSION_GEM} into the pkg directory"
task :ext => [:clean] do
sh "gem build #{EXT_GEMSPEC.name}.gemspec"
Expand All @@ -120,7 +120,7 @@ namespace :build do
end
end

if Concurrent.jruby?
if Concurrent.on_jruby?
desc 'Build JRuby-specific core gem (alias for `build:core`)'
task :build => ['build:core']
else
Expand Down
1 change: 0 additions & 1 deletion concurrent-ruby-ext.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Gem::Specification.new do |s|

s.files = Dir['ext/**/*.{h,c,cpp}']
s.files += [
'lib/extension_helper.rb',
'lib/concurrent/atomic_reference/concurrent_update_error.rb',
'lib/concurrent/atomic_reference/direct_update.rb',
'lib/concurrent/atomic_reference/numeric_cas_wrapper.rb',
Expand Down
1 change: 1 addition & 0 deletions ext/ConcurrentRubyExtService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException {
new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false);
new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false);
new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false);
new com.concurrent_ruby.ext.SynchronizationLibrary().load(runtime, false);
return true;
}
}
106 changes: 106 additions & 0 deletions ext/com/concurrent_ruby/ext/SynchronizationLibrary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.concurrent_ruby.ext;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.load.Library;
import org.jruby.runtime.Block;
import org.jruby.RubyBoolean;
import org.jruby.RubyNil;
import org.jruby.runtime.ThreadContext;

public class SynchronizationLibrary implements Library {

public void load(Ruby runtime, boolean wrap) throws IOException {
RubyModule synchronizationModule = runtime.
defineModule("Concurrent").
defineModuleUnder("Synchronization");
RubyClass parentClass = synchronizationModule.getClass("AbstractObject");

if (parentClass == null)
throw runtime.newRuntimeError("Concurrent::Synchronization::AbstractObject is missing");

RubyClass synchronizedObjectJavaClass =
synchronizationModule.defineClassUnder("JavaObject", parentClass, JRUBYREFERENCE_ALLOCATOR);

synchronizedObjectJavaClass.defineAnnotatedMethods(JavaObject.class);
}

private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JavaObject(runtime, klazz);
}
};

@JRubyClass(name = "JavaObject", parent = "AbstractObject")
public static class JavaObject extends RubyObject {

public JavaObject(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod
public IRubyObject initialize(ThreadContext context) {
return context.nil;
}

@JRubyMethod(name = "synchronize")
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
synchronized (this) {
return block.yield(context, null);
}
}

@JRubyMethod(name = "ns_wait", optional = 1)
public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;
if (args.length > 1) {
throw runtime.newArgumentError(args.length, 1);
}
Double timeout = null;
if (args.length > 0 && !args[0].isNil()) {
timeout = args[0].convertToFloat().getDoubleValue();
if (timeout < 0) {
throw runtime.newArgumentError("time interval must be positive");
}
}
if (Thread.interrupted()) {
throw runtime.newConcurrencyError("thread interrupted");
}
boolean success = false;
try {
success = context.getThread().wait_timeout(this, timeout);
} catch (InterruptedException ie) {
throw runtime.newConcurrencyError(ie.getLocalizedMessage());
} finally {
// An interrupt or timeout may have caused us to miss
// a notify that we consumed, so do another notify in
// case someone else is available to pick it up.
if (!success) {
this.notify();
}
}
return this;
}

@JRubyMethod(name = "ns_signal")
public IRubyObject nsSignal(ThreadContext context) {
notify();
return this;
}

@JRubyMethod(name = "ns_broadcast")
public IRubyObject nsBroadcast(ThreadContext context) {
notifyAll();
return this;
}
}
}
2 changes: 1 addition & 1 deletion ext/concurrent/extconf.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'fileutils'

require_relative '../../lib/extension_helper'
require 'concurrent/native_extensions'

EXTENSION_NAME = 'extension'

Expand Down
2 changes: 2 additions & 0 deletions lib/concurrent.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require 'concurrent/version'

require 'concurrent/synchronization'

require 'concurrent/configuration'

require 'concurrent/actor'
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'concurrent/executor/serialized_execution'
require 'concurrent/ivar'
require 'concurrent/logging'
require 'concurrent/atomic/synchronization'
require 'concurrent/synchronization'

module Concurrent
# TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ?
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/actor/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ module Actor
# @note devel: core should not block on anything, e.g. it cannot wait on
# children to terminate that would eat up all threads in task pool and
# deadlock
class Core
class Core < Synchronization::Object
include TypeCheck
include Concurrent::Logging
include Synchronization

# @!attribute [r] reference
# @return [Reference] reference to this actor which can be safely passed around
Expand Down Expand Up @@ -48,6 +47,7 @@ class Core
# any logging system
# @param [Proc] block for class instantiation
def initialize(opts = {}, &block)
super(&nil)
synchronize do
@mailbox = Array.new
@serialized_execution = SerializedExecution.new
Expand Down
9 changes: 5 additions & 4 deletions lib/concurrent/atomic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
# user that they should use the new implementation instead.

if defined?(Atomic)
warn <<-RUBY
warn <<-TXT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider removing this entirely for 0.9. since this message was introduced in 0.8. We definitely do not want it in the 1.0 release.

[ATOMIC] Detected an `Atomic` class, which may indicate a dependency
on the ruby-atomic gem. That gem has been deprecated and merged into
the concurrent-ruby gem. Please use the Concurrent::Atomic class for
atomic references and not the Atomic class.
RUBY
TXT
end
#####################################################################

require_relative '../extension_helper'
require 'concurrent/native_extensions'
require 'concurrent/utility/engine'
require 'concurrent/atomic_reference/concurrent_update_error'
require 'concurrent/atomic_reference/mutex_atomic'

Expand All @@ -21,7 +22,7 @@
if /[^0fF]/ =~ ENV['FORCE_ATOMIC_FALLBACK']
ruby_engine = 'mutex_atomic'
else
ruby_engine = defined?(RUBY_ENGINE)? RUBY_ENGINE : 'ruby'
ruby_engine = Concurrent.ruby_engine
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

I really like this ruby_gem abstraction. We're starting to check this so often (and will continue to do so as we add more optimizations) that we needed a cleaner way to check.

end

require "concurrent/atomic_reference/#{ruby_engine}"
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/atomic/atomic_boolean.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require_relative '../../extension_helper'
require 'concurrent/native_extensions'

module Concurrent

Expand Down Expand Up @@ -113,7 +113,7 @@ def make_false
end
end

if RUBY_PLATFORM == 'java'
if Concurrent.on_jruby?

class AtomicBoolean < JavaAtomicBoolean
end
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/atomic/atomic_fixnum.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require_relative '../../extension_helper'
require 'concurrent/native_extensions'

module Concurrent

Expand Down Expand Up @@ -118,7 +118,7 @@ def compare_and_set(expect, update)
end
end

if RUBY_PLATFORM == 'java'
if Concurrent.on_jruby?

# @!macro atomic_fixnum
class AtomicFixnum < JavaAtomicFixnum
Expand Down
29 changes: 10 additions & 19 deletions lib/concurrent/atomic/count_down_latch.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'concurrent/atomic/condition'
require 'concurrent/synchronization'

module Concurrent

Expand All @@ -11,7 +11,7 @@ module Concurrent
# method. Each of the other threads calls `#count_down` when done with its work.
# When the latch counter reaches zero the waiting thread is unblocked and continues
# with its work. A `CountDownLatch` can be used only once. Its value cannot be reset.
class MutexCountDownLatch
class PureCountDownLatch < Synchronization::Object

# @!macro [attach] count_down_latch_method_initialize
#
Expand All @@ -21,12 +21,11 @@ class MutexCountDownLatch
#
# @raise [ArgumentError] if `count` is not an integer or is less than zero
def initialize(count = 1)
super()
unless count.is_a?(Fixnum) && count >= 0
raise ArgumentError.new('count must be in integer greater than or equal zero')
end
@mutex = Mutex.new
@condition = Condition.new
@count = count
synchronize { @count = count }
end

# @!macro [attach] count_down_latch_method_wait
Expand All @@ -37,25 +36,17 @@ def initialize(count = 1)
# to block indefinitely
# @return [Boolean] `true` if the `count` reaches zero else false on `timeout`
def wait(timeout = nil)
@mutex.synchronize do

remaining = Condition::Result.new(timeout)
while @count > 0 && remaining.can_wait?
remaining = @condition.wait(@mutex, remaining.remaining_time)
end

@count == 0
end
synchronize { ns_wait_until(timeout) { @count == 0 } }
end

# @!macro [attach] count_down_latch_method_count_down
#
# Signal the latch to decrement the counter. Will signal all blocked threads when
# the `count` reaches zero.
def count_down
@mutex.synchronize do
synchronize do
@count -= 1 if @count > 0
@condition.broadcast if @count == 0
ns_broadcast if @count == 0
end
end

Expand All @@ -65,11 +56,11 @@ def count_down
#
# @return [Fixnum] the current value of the counter
def count
@mutex.synchronize { @count }
synchronize { @count }
end
end

if RUBY_PLATFORM == 'java'
if Concurrent.on_jruby?

# @!macro count_down_latch
class JavaCountDownLatch
Expand Down Expand Up @@ -110,7 +101,7 @@ class CountDownLatch < JavaCountDownLatch
else

# @!macro count_down_latch
class CountDownLatch < MutexCountDownLatch
class CountDownLatch < PureCountDownLatch
end
end
end
Loading