Skip to content

Commit 87bd786

Browse files
Merge branch 'super-extends' of git://github.com/jmhofer/RxJava into covariant-support
2 parents 2d58f7c + f72806a commit 87bd786

File tree

93 files changed

+1383
-932
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1383
-932
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ object RxImplicits {
106106
* type never escapes the for-comprehension
107107
*/
108108
implicit class ScalaObservable[A](wrapped: Observable[A]) {
109-
def map[B](f: A => B): Observable[B] = wrapped.map(f)
109+
def map[B](f: A => B): Observable[B] = wrapped.map[B](f)
110110
def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f)
111111
def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f)
112112
def withFilter(p: A => Boolean): WithFilter = new WithFilter(p)
@@ -147,7 +147,7 @@ class UnitTestSuite extends JUnitSuite {
147147
class ObservableWithException(s: Subscription, values: String*) extends Observable[String] {
148148
var t: Thread = null
149149

150-
override def subscribe(observer: Observer[String]): Subscription = {
150+
override def subscribe(observer: Observer[_ >: String]): Subscription = {
151151
println("ObservableWithException subscribed to ...")
152152
t = new Thread(new Runnable() {
153153
override def run() {
@@ -248,7 +248,7 @@ class UnitTestSuite extends JUnitSuite {
248248

249249
@Test def testFlattenMerge {
250250
val observable = Observable.from(Observable.from(1, 2, 3))
251-
val merged = Observable.merge(observable)
251+
val merged = Observable.merge[Int](observable)
252252
assertSubscribeReceives(merged)(1, 2, 3)
253253
}
254254

@@ -272,6 +272,18 @@ class UnitTestSuite extends JUnitSuite {
272272
assertSubscribeReceives(synchronized)(1, 2, 3)
273273
}
274274

275+
@Test def testZip2() {
276+
val colors: Observable[String] = Observable.from("red", "green", "blue")
277+
val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro")
278+
279+
case class Character(color: String, name: String)
280+
281+
val cheetara = Character("green", "cheetara")
282+
val panthro = Character("blue", "panthro")
283+
val characters = Observable.zip[String, String, Character](colors, names, Character.apply _)
284+
assertSubscribeReceives(characters)(cheetara, panthro)
285+
}
286+
275287
@Test def testZip3() {
276288
val numbers = Observable.from(1, 2, 3)
277289
val colors = Observable.from("red", "green", "blue")
@@ -283,7 +295,7 @@ class UnitTestSuite extends JUnitSuite {
283295
val cheetara = Character(2, "green", "cheetara")
284296
val panthro = Character(3, "blue", "panthro")
285297

286-
val characters = Observable.zip(numbers, colors, names, Character.apply _)
298+
val characters = Observable.zip[Int, String, String, Character](numbers, colors, names, Character.apply _)
287299
assertSubscribeReceives(characters)(liono, cheetara, panthro)
288300
}
289301

@@ -299,7 +311,7 @@ class UnitTestSuite extends JUnitSuite {
299311
val cheetara = Character(2, "green", "cheetara", false)
300312
val panthro = Character(3, "blue", "panthro", false)
301313

302-
val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _)
314+
val characters = Observable.zip[Int, String, String, Boolean, Character](numbers, colors, names, isLeader, Character.apply _)
303315
assertSubscribeReceives(characters)(liono, cheetara, panthro)
304316
}
305317

@@ -338,7 +350,8 @@ class UnitTestSuite extends JUnitSuite {
338350
@Test def testMap {
339351
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
340352
val mappedNumbers = ArrayBuffer.empty[Int]
341-
numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => {
353+
val mapped: Observable[Int] = numbers map ((x: Int) => x * x)
354+
mapped.subscribe((squareVal: Int) => {
342355
mappedNumbers.append(squareVal)
343356
})
344357
assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList)
@@ -458,18 +471,9 @@ class UnitTestSuite extends JUnitSuite {
458471
assertSubscribeReceives(skipped)(3, 4)
459472
}
460473

461-
/**
462-
* Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
463-
* observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
464-
* it should produce onNext(first), onNext(second), and 1 onCompleted
465-
*
466-
* Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
467-
*/
468474
@Test def testTake {
469-
import rx.operators._
470-
471475
val observable = Observable.from(1, 2, 3, 4, 5)
472-
val took = Observable.create(OperationTake.take(observable, 2))
476+
val took = observable.take(2)
473477
assertSubscribeReceives(took)(1, 2)
474478
}
475479

@@ -479,11 +483,11 @@ class UnitTestSuite extends JUnitSuite {
479483
assertSubscribeReceives(took)(1, 3, 5)
480484
}
481485

482-
/*@Test def testTakeWhileWithIndex {
483-
val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17)
484-
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4)
485-
assertSubscribeReceives(took)(9, 11)
486-
}*/
486+
@Test def testTakeWhileWithIndex {
487+
val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17)
488+
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8)
489+
assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11)
490+
}
487491

488492
@Test def testTakeLast {
489493
val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
@@ -559,7 +563,7 @@ class UnitTestSuite extends JUnitSuite {
559563

560564
@Test def testFilterInForComprehension {
561565
val doubler = (i: Int) => Observable.from(i, i)
562-
val filteredObservable = for {
566+
val filteredObservable: Observable[Int] = for {
563567
i: Int <- Observable.from(1, 2, 3, 4)
564568
j: Int <- doubler(i) if isOdd(i)
565569
} yield j

rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package rx.android.concurrency;
22

33
import android.os.Handler;
4+
45
import org.junit.Test;
56
import org.junit.runner.RunWith;
67
import org.mockito.ArgumentCaptor;
78
import org.robolectric.RobolectricTestRunner;
89
import org.robolectric.annotation.Config;
10+
911
import rx.Scheduler;
1012
import rx.Subscription;
1113
import rx.operators.SafeObservableSubscription;
@@ -39,7 +41,7 @@ public HandlerThreadScheduler(Handler handler) {
3941
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
4042
*/
4143
@Override
42-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
44+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
4345
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
4446
}
4547

@@ -56,7 +58,7 @@ public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscr
5658
* @return A Subscription from which one can unsubscribe from.
5759
*/
5860
@Override
59-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
61+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
6062
final SafeObservableSubscription subscription = new SafeObservableSubscription();
6163
final Scheduler _scheduler = this;
6264
handler.postDelayed(new Runnable() {
@@ -76,6 +78,7 @@ public static final class UnitTest {
7678
public void shouldScheduleImmediateActionOnHandlerThread() {
7779
final Handler handler = mock(Handler.class);
7880
final Object state = new Object();
81+
@SuppressWarnings("unchecked")
7982
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
8083

8184
Scheduler scheduler = new HandlerThreadScheduler(handler);
@@ -94,6 +97,7 @@ public void shouldScheduleImmediateActionOnHandlerThread() {
9497
public void shouldScheduleDelayedActionOnHandlerThread() {
9598
final Handler handler = mock(Handler.class);
9699
final Object state = new Object();
100+
@SuppressWarnings("unchecked")
97101
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
98102

99103
Scheduler scheduler = new HandlerThreadScheduler(handler);

rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private SwingScheduler() {
5555
}
5656

5757
@Override
58-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
58+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
5959
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
6060
EventQueue.invokeLater(new Runnable() {
6161
@Override
@@ -75,7 +75,7 @@ public void call() {
7575
}
7676

7777
@Override
78-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
78+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
7979
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
8080
long delay = unit.toMillis(dueTime);
8181
assertThatTheDelayIsValidForTheSwingTimer(delay);
@@ -113,7 +113,7 @@ public void call() {
113113
}
114114

115115
@Override
116-
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
116+
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
117117
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
118118

119119
final long delay = unit.toMillis(period);

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/AbstractButtonSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public enum AbstractButtonSource { ; // no instances
4242
* @see SwingObservable.fromButtonAction
4343
*/
4444
public static Observable<ActionEvent> fromActionOf(final AbstractButton button) {
45-
return Observable.create(new Func1<Observer<ActionEvent>, Subscription>() {
45+
return Observable.create(new Func1<Observer<? super ActionEvent>, Subscription>() {
4646
@Override
47-
public Subscription call(final Observer<ActionEvent> observer) {
47+
public Subscription call(final Observer<? super ActionEvent> observer) {
4848
final ActionListener listener = new ActionListener() {
4949
@Override
5050
public void actionPerformed(ActionEvent e) {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/ComponentEventSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public enum ComponentEventSource { ; // no instances
3636
* @see SwingObservable.fromComponentEvents
3737
*/
3838
public static Observable<ComponentEvent> fromComponentEventsOf(final Component component) {
39-
return Observable.create(new Func1<Observer<ComponentEvent>, Subscription>() {
39+
return Observable.create(new Func1<Observer<? super ComponentEvent>, Subscription>() {
4040
@Override
41-
public Subscription call(final Observer<ComponentEvent> observer) {
41+
public Subscription call(final Observer<? super ComponentEvent> observer) {
4242
final ComponentListener listener = new ComponentListener() {
4343
@Override
4444
public void componentHidden(ComponentEvent event) {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public enum KeyEventSource { ; // no instances
5050
* @see SwingObservable.fromKeyEvents(Component)
5151
*/
5252
public static Observable<KeyEvent> fromKeyEventsOf(final Component component) {
53-
return Observable.create(new Func1<Observer<KeyEvent>, Subscription>() {
53+
return Observable.create(new Func1<Observer<? super KeyEvent>, Subscription>() {
5454
@Override
55-
public Subscription call(final Observer<KeyEvent> observer) {
55+
public Subscription call(final Observer<? super KeyEvent> observer) {
5656
final KeyListener listener = new KeyListener() {
5757
@Override
5858
public void keyPressed(KeyEvent event) {

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/MouseEventSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public enum MouseEventSource { ; // no instances
3333
* @see SwingObservable.fromMouseEvents
3434
*/
3535
public static Observable<MouseEvent> fromMouseEventsOf(final Component component) {
36-
return Observable.create(new Func1<Observer<MouseEvent>, Subscription>() {
36+
return Observable.create(new Func1<Observer<? super MouseEvent>, Subscription>() {
3737
@Override
38-
public Subscription call(final Observer<MouseEvent> observer) {
38+
public Subscription call(final Observer<? super MouseEvent> observer) {
3939
final MouseListener listener = new MouseListener() {
4040
@Override
4141
public void mouseClicked(MouseEvent event) {
@@ -78,9 +78,9 @@ public void call() {
7878
* @see SwingObservable.fromMouseMotionEvents
7979
*/
8080
public static Observable<MouseEvent> fromMouseMotionEventsOf(final Component component) {
81-
return Observable.create(new Func1<Observer<MouseEvent>, Subscription>() {
81+
return Observable.create(new Func1<Observer<? super MouseEvent>, Subscription>() {
8282
@Override
83-
public Subscription call(final Observer<MouseEvent> observer) {
83+
public Subscription call(final Observer<? super MouseEvent> observer) {
8484
final MouseMotionListener listener = new MouseMotionListener() {
8585
@Override
8686
public void mouseDragged(MouseEvent event) {

0 commit comments

Comments
 (0)