Skip to content

Commit cdf25a6

Browse files
Add OnSubscribeFunction and refactor to support it
This is related to ongoing work related to covariant support at ReactiveX#331
1 parent 87bd786 commit cdf25a6

File tree

61 files changed

+328
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+328
-299
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.groovy;
17+
18+
import groovy.lang.Closure;
19+
import rx.Observable.OnSubscribeFunc;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
23+
/**
24+
* Concrete wrapper that accepts a {@link Closure} and produces a {@link OnSubscribeFunc}.
25+
*
26+
* @param <T>
27+
*/
28+
public class GroovyOnSubscribeFuncWrapper<T> implements OnSubscribeFunc<T> {
29+
30+
private final Closure<Subscription> closure;
31+
32+
public GroovyOnSubscribeFuncWrapper(Closure<Subscription> closure) {
33+
this.closure = closure;
34+
}
35+
36+
@Override
37+
public Subscription call(Observer<? super T> observer) {
38+
return closure.call(observer);
39+
}
40+
41+
}

language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;
3434

3535
import rx.Observable;
36+
import rx.Observable.OnSubscribeFunc;
3637
import rx.observables.BlockingObservable;
3738
import rx.util.functions.Action;
3839
import rx.util.functions.Function;
@@ -52,33 +53,6 @@ public RxGroovyExtensionModule() {
5253
super("RxGroovyExtensionModule", "1.0");
5354
}
5455

55-
/**
56-
* Keeping this code around a little while as it was hard to figure out ... and I'm still messing with it while debugging.
57-
*
58-
* Once the rest of this ExtensionModule stuff is working I'll delete this method.
59-
*
60-
* This is used for manually initializing rather than going via the org.codehaus.groovy.runtime.ExtensionModule properties file.
61-
*/
62-
public static void initializeManuallyForTesting() {
63-
System.out.println("initialize");
64-
MetaClassRegistryImpl mcRegistry = ((MetaClassRegistryImpl) GroovySystem.getMetaClassRegistry());
65-
// RxGroovyExtensionModule em = new RxGroovyExtensionModule();
66-
67-
Properties p = new Properties();
68-
p.setProperty("moduleFactory", "rx.lang.groovy.RxGroovyPropertiesModuleFactory");
69-
Map<CachedClass, List<MetaMethod>> metaMethods = new HashMap<CachedClass, List<MetaMethod>>();
70-
mcRegistry.registerExtensionModuleFromProperties(p, RxGroovyExtensionModule.class.getClassLoader(), metaMethods);
71-
72-
for (ExtensionModule m : mcRegistry.getModuleRegistry().getModules()) {
73-
System.out.println("Module: " + m.getName());
74-
}
75-
76-
for (CachedClass cc : metaMethods.keySet()) {
77-
System.out.println("Adding MetaMethods to CachedClass: " + cc);
78-
cc.addNewMopMethods(metaMethods.get(cc));
79-
}
80-
}
81-
8256
@SuppressWarnings("rawtypes")
8357
@Override
8458
public List<MetaMethod> getMetaMethods() {
@@ -135,6 +109,8 @@ public Object invoke(Object object, Object[] arguments) {
135109
if (o instanceof Closure) {
136110
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
137111
newArgs[i] = new GroovyActionWrapper((Closure) o);
112+
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
113+
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
138114
} else {
139115
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
140116
}

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.mockito.MockitoAnnotations;
3131

3232
import rx.Notification;
3333
import rx.Observable;
34+
import rx.Observable.OnSubscribeFunc;
3435
import rx.Observer;
3536
import rx.Subscription;
3637
import rx.observables.GroupedObservable;
@@ -296,7 +297,7 @@ def class ObservableTests {
296297
}
297298

298299

299-
def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
300+
def class AsyncObservable implements OnSubscribeFunc {
300301

301302
public Subscription call(final Observer<Integer> observer) {
302303
new Thread(new Runnable() {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/AbstractButtonSource.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package rx.swing.sources;
1717

18-
import static org.mockito.Mockito.mock;
19-
import static org.mockito.Mockito.never;
20-
import static org.mockito.Mockito.times;
21-
import static org.mockito.Mockito.verify;
18+
import static org.mockito.Mockito.*;
2219

2320
import java.awt.event.ActionEvent;
2421
import java.awt.event.ActionListener;
@@ -29,20 +26,21 @@
2926
import org.mockito.Matchers;
3027

3128
import rx.Observable;
29+
import rx.Observable.OnSubscribeFunc;
3230
import rx.Observer;
3331
import rx.Subscription;
32+
import rx.observables.SwingObservable;
3433
import rx.subscriptions.Subscriptions;
3534
import rx.util.functions.Action0;
3635
import rx.util.functions.Action1;
37-
import rx.util.functions.Func1;
3836

3937
public enum AbstractButtonSource { ; // no instances
4038

4139
/**
4240
* @see SwingObservable.fromButtonAction
4341
*/
4442
public static Observable<ActionEvent> fromActionOf(final AbstractButton button) {
45-
return Observable.create(new Func1<Observer<? super ActionEvent>, Subscription>() {
43+
return Observable.create(new OnSubscribeFunc<ActionEvent>() {
4644
@Override
4745
public Subscription call(final Observer<? super ActionEvent> observer) {
4846
final ActionListener listener = new ActionListener() {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/ComponentEventSource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
*/
1616
package rx.swing.sources;
1717

18-
import static rx.swing.sources.ComponentEventSource.Predicate.RESIZED;
18+
import static rx.swing.sources.ComponentEventSource.Predicate.*;
1919

2020
import java.awt.Component;
2121
import java.awt.Dimension;
2222
import java.awt.event.ComponentEvent;
2323
import java.awt.event.ComponentListener;
2424

2525
import rx.Observable;
26+
import rx.Observable.OnSubscribeFunc;
2627
import rx.Observer;
2728
import rx.Subscription;
2829
import rx.observables.SwingObservable;
@@ -36,7 +37,7 @@ public enum ComponentEventSource { ; // no instances
3637
* @see SwingObservable.fromComponentEvents
3738
*/
3839
public static Observable<ComponentEvent> fromComponentEventsOf(final Component component) {
39-
return Observable.create(new Func1<Observer<? super ComponentEvent>, Subscription>() {
40+
return Observable.create(new OnSubscribeFunc<ComponentEvent>() {
4041
@Override
4142
public Subscription call(final Observer<? super ComponentEvent> observer) {
4243
final ComponentListener listener = new ComponentListener() {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,8 @@
1515
*/
1616
package rx.swing.sources;
1717

18-
import static java.util.Arrays.asList;
19-
import static org.mockito.Mockito.inOrder;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.never;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
18+
import static java.util.Arrays.*;
19+
import static org.mockito.Mockito.*;
2420

2521
import java.awt.Component;
2622
import java.awt.event.KeyEvent;
@@ -36,12 +32,12 @@
3632
import org.mockito.Matchers;
3733

3834
import rx.Observable;
35+
import rx.Observable.OnSubscribeFunc;
3936
import rx.Observer;
4037
import rx.Subscription;
4138
import rx.subscriptions.Subscriptions;
4239
import rx.util.functions.Action0;
4340
import rx.util.functions.Action1;
44-
import rx.util.functions.Func1;
4541
import rx.util.functions.Func2;
4642

4743
public enum KeyEventSource { ; // no instances
@@ -50,7 +46,7 @@ public enum KeyEventSource { ; // no instances
5046
* @see SwingObservable.fromKeyEvents(Component)
5147
*/
5248
public static Observable<KeyEvent> fromKeyEventsOf(final Component component) {
53-
return Observable.create(new Func1<Observer<? super KeyEvent>, Subscription>() {
49+
return Observable.create(new OnSubscribeFunc<KeyEvent>() {
5450
@Override
5551
public Subscription call(final Observer<? super KeyEvent> observer) {
5652
final KeyListener listener = new KeyListener() {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/MouseEventSource.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@
2121
import java.awt.event.MouseMotionListener;
2222

2323
import rx.Observable;
24+
import rx.Observable.OnSubscribeFunc;
2425
import rx.Observer;
2526
import rx.Subscription;
27+
import rx.observables.SwingObservable;
2628
import rx.subscriptions.Subscriptions;
2729
import rx.util.functions.Action0;
28-
import rx.util.functions.Func1;
2930

3031
public enum MouseEventSource { ; // no instances
3132

3233
/**
3334
* @see SwingObservable.fromMouseEvents
3435
*/
3536
public static Observable<MouseEvent> fromMouseEventsOf(final Component component) {
36-
return Observable.create(new Func1<Observer<? super MouseEvent>, Subscription>() {
37+
return Observable.create(new OnSubscribeFunc<MouseEvent>() {
3738
@Override
3839
public Subscription call(final Observer<? super MouseEvent> observer) {
3940
final MouseListener listener = new MouseListener() {
@@ -78,7 +79,7 @@ public void call() {
7879
* @see SwingObservable.fromMouseMotionEvents
7980
*/
8081
public static Observable<MouseEvent> fromMouseMotionEventsOf(final Component component) {
81-
return Observable.create(new Func1<Observer<? super MouseEvent>, Subscription>() {
82+
return Observable.create(new OnSubscribeFunc<MouseEvent>() {
8283
@Override
8384
public Subscription call(final Observer<? super MouseEvent> observer) {
8485
final MouseMotionListener listener = new MouseMotionListener() {

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,19 @@ public class Observable<T> {
113113

114114
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
115115

116-
private final Func1<? super Observer<? super T>, ? extends Subscription> onSubscribe;
116+
private final OnSubscribeFunc<T> onSubscribe;
117117

118+
/**
119+
* Function interface for work to be performed when an {@link Observable} is subscribed to via {@link Observable#subscribe(Observer)}
120+
*
121+
* @param <T>
122+
*/
123+
public static interface OnSubscribeFunc<T> extends Function<T> {
124+
125+
public Subscription call(Observer<? super T> t1);
126+
127+
}
128+
118129
/**
119130
* Observable with Function to execute when subscribed to.
120131
* <p>
@@ -124,7 +135,7 @@ public class Observable<T> {
124135
* @param onSubscribe
125136
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
126137
*/
127-
protected Observable(Func1<? super Observer<? super T>, ? extends Subscription> onSubscribe) {
138+
protected Observable(OnSubscribeFunc<T> onSubscribe) {
128139
this.onSubscribe = onSubscribe;
129140
}
130141

@@ -162,7 +173,7 @@ protected Observable() {
162173
*/
163174
public Subscription subscribe(Observer<? super T> observer) {
164175
// allow the hook to intercept and/or decorate
165-
Func1<? super Observer<? super T>, ? extends Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
176+
OnSubscribeFunc<T> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
166177
// validate and proceed
167178
if (observer == null) {
168179
throw new IllegalArgumentException("observer can not be null");
@@ -402,7 +413,7 @@ private void handleError(Throwable e) {
402413
*/
403414
private static class NeverObservable<T> extends Observable<T> {
404415
public NeverObservable() {
405-
super(new Func1<Observer<? super T>, Subscription>() {
416+
super(new OnSubscribeFunc<T>() {
406417

407418
@Override
408419
public Subscription call(Observer<? super T> t1) {
@@ -422,7 +433,7 @@ public Subscription call(Observer<? super T> t1) {
422433
private static class ThrowObservable<T> extends Observable<T> {
423434

424435
public ThrowObservable(final Throwable exception) {
425-
super(new Func1<Observer<? super T>, Subscription>() {
436+
super(new OnSubscribeFunc<T>() {
426437

427438
/**
428439
* Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
@@ -466,7 +477,7 @@ public Subscription call(Observer<? super T> observer) {
466477
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given
467478
* function
468479
*/
469-
public static <T> Observable<T> create(Func1<? super Observer<? super T>, ? extends Subscription> func) {
480+
public static <T> Observable<T> create(OnSubscribeFunc<T> func) {
470481
return new Observable<T>(func);
471482
}
472483

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
*/
6161
public class BlockingObservable<T> extends Observable<T> {
6262

63-
protected BlockingObservable(Func1<? super Observer<? super T>, ? extends Subscription> onSubscribe) {
63+
protected BlockingObservable(OnSubscribeFunc<T> onSubscribe) {
6464
super(onSubscribe);
6565
}
6666

@@ -76,7 +76,7 @@ private BlockingObservable() {
7676
* Convert an Observable into a BlockingObservable.
7777
*/
7878
public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
79-
return new BlockingObservable<T>(new Func1<Observer<? super T>, Subscription>() {
79+
return new BlockingObservable<T>(new OnSubscribeFunc<T>() {
8080

8181
@Override
8282
public Subscription call(Observer<? super T> observer) {
@@ -784,7 +784,7 @@ public void testToIterable() {
784784

785785
@Test(expected = TestException.class)
786786
public void testToIterableWithException() {
787-
BlockingObservable<String> obs = BlockingObservable.from(create(new Func1<Observer<? super String>, Subscription>() {
787+
BlockingObservable<String> obs = BlockingObservable.from(create(new OnSubscribeFunc<String>() {
788788

789789
@Override
790790
public Subscription call(Observer<? super String> observer) {
@@ -807,7 +807,7 @@ public Subscription call(Observer<? super String> observer) {
807807
@Test
808808
public void testForEachWithError() {
809809
try {
810-
BlockingObservable.from(Observable.create(new Func1<Observer<? super String>, Subscription>() {
810+
BlockingObservable.from(Observable.create(new OnSubscribeFunc<String>() {
811811

812812
@Override
813813
public Subscription call(final Observer<? super String> observer) {

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import rx.Observable;
1919
import rx.Observer;
2020
import rx.Subscription;
21-
import rx.util.functions.Func1;
2221

2322
/**
2423
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
@@ -37,7 +36,7 @@
3736

3837
public abstract class ConnectableObservable<T> extends Observable<T> {
3938

40-
protected ConnectableObservable(Func1<? super Observer<? super T>, ? extends Subscription> onSubscribe) {
39+
protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
4140
super(onSubscribe);
4241
}
4342

0 commit comments

Comments
 (0)