From db7ae4bb4fa2500f070cd16cfd6cd03749aac8a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Tue, 10 Dec 2019 23:25:32 +0100 Subject: [PATCH] 3.x: Fix Observable amb, combineLatest & zip ArrayStoreException --- .../operators/observable/ObservableAmb.java | 2 +- .../observable/ObservableCombineLatest.java | 2 +- .../operators/observable/ObservableZip.java | 2 +- .../completable/CompletableAmbTest.java | 14 +++++++++++++ .../operators/flowable/FlowableAmbTest.java | 15 ++++++++++++++ .../flowable/FlowableCombineLatestTest.java | 20 +++++++++++++++++++ .../operators/flowable/FlowableZipTest.java | 20 +++++++++++++++++++ .../operators/maybe/MaybeAmbTest.java | 15 ++++++++++++++ .../operators/maybe/MaybeZipIterableTest.java | 20 +++++++++++++++++++ .../observable/ObservableAmbTest.java | 14 +++++++++++++ .../ObservableCombineLatestTest.java | 20 +++++++++++++++++++ .../observable/ObservableZipTest.java | 20 +++++++++++++++++++ .../operators/single/SingleAmbTest.java | 15 ++++++++++++++ .../single/SingleZipIterableTest.java | 20 +++++++++++++++++++ 14 files changed, 196 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java index adb85e3d71..aa8fae849d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java @@ -36,7 +36,7 @@ public void subscribeActual(Observer observer) { ObservableSource[] sources = this.sources; int count = 0; if (sources == null) { - sources = new Observable[8]; + sources = new ObservableSource[8]; try { for (ObservableSource p : sourcesIterable) { if (p == null) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatest.java index ac18b94d79..6a379d142c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatest.java @@ -48,7 +48,7 @@ public void subscribeActual(Observer observer) { ObservableSource[] sources = this.sources; int count = 0; if (sources == null) { - sources = new Observable[8]; + sources = new ObservableSource[8]; for (ObservableSource p : sourcesIterable) { if (count == sources.length) { ObservableSource[] b = new ObservableSource[count + (count >> 2)]; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZip.java index faae21a6e4..b3d40dc2ed 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZip.java @@ -50,7 +50,7 @@ public void subscribeActual(Observer observer) { ObservableSource[] sources = this.sources; int count = 0; if (sources == null) { - sources = new Observable[8]; + sources = new ObservableSource[8]; for (ObservableSource p : sourcesIterable) { if (count == sources.length) { ObservableSource[] b = new ObservableSource[count + (count >> 2)]; diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmbTest.java index 02e607e6c8..643ae8a880 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmbTest.java @@ -315,4 +315,18 @@ public void run() throws Exception { assertFalse("Interrupted!", interrupted.get()); } } + + @Test + public void completableSourcesInIterable() { + CompletableSource source = new CompletableSource() { + @Override + public void subscribe(CompletableObserver observer) { + Completable.complete().subscribe(observer); + } + }; + + Completable.amb(Arrays.asList(source, source)) + .test() + .assertResult(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java index 149187b573..72073f3150 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java @@ -667,4 +667,19 @@ public void run() throws Exception { assertFalse("Interrupted!", interrupted.get()); } } + + @SuppressWarnings("unchecked") + @Test + public void publishersInIterable() { + Publisher source = new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + Flowable.just(1).subscribe(subscriber); + } + }; + + Flowable.amb(Arrays.asList(source, source)) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java index 53f26254d8..836fd8a4c3 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java @@ -1551,4 +1551,24 @@ public Object apply(Object[] a) throws Exception { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class, 42); } + + @SuppressWarnings("unchecked") + @Test + public void publishersInIterable() { + Publisher source = new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + Flowable.just(1).subscribe(subscriber); + } + }; + + Flowable.combineLatest(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] t) throws Throwable { + return 2; + } + }) + .test() + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java index 7aa44b7534..d6921bfd91 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java @@ -1896,4 +1896,24 @@ public Object apply(Object[] a) throws Exception { assertEquals(0, counter.get()); } + + @SuppressWarnings("unchecked") + @Test + public void publishersInIterable() { + Publisher source = new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + Flowable.just(1).subscribe(subscriber); + } + }; + + Flowable.zip(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] t) throws Throwable { + return 2; + } + }) + .test() + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmbTest.java index 3b89b06b27..36addaab95 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmbTest.java @@ -254,4 +254,19 @@ public void run() { } } } + + @SuppressWarnings("unchecked") + @Test + public void maybeSourcesInIterable() { + MaybeSource source = new MaybeSource() { + @Override + public void subscribe(MaybeObserver observer) { + Maybe.just(1).subscribe(observer); + } + }; + + Maybe.amb(Arrays.asList(source, source)) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipIterableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipIterableTest.java index 934e2e512c..9ce40cee25 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipIterableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipIterableTest.java @@ -221,4 +221,24 @@ public void singleSourceZipperReturnsNull() { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value"); } + + @SuppressWarnings("unchecked") + @Test + public void maybeSourcesInIterable() { + MaybeSource source = new MaybeSource() { + @Override + public void subscribe(MaybeObserver observer) { + Maybe.just(1).subscribe(observer); + } + }; + + Maybe.zip(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] t) throws Throwable { + return 2; + } + }) + .test() + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java index 29287a7772..14c720257a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java @@ -467,4 +467,18 @@ public void run() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void observableSourcesInIterable() { + ObservableSource source = new ObservableSource() { + @Override + public void subscribe(Observer observer) { + Observable.just(1).subscribe(observer); + } + }; + + Observable.amb(Arrays.asList(source, source)) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java index 3086e3941d..4dca384928 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java @@ -1212,4 +1212,24 @@ public Object apply(Object[] a) throws Exception { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class, 42); } + + @SuppressWarnings("unchecked") + @Test + public void observableSourcesInIterable() { + ObservableSource source = new ObservableSource() { + @Override + public void subscribe(Observer observer) { + Observable.just(1).subscribe(observer); + } + }; + + Observable.combineLatest(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] t) throws Throwable { + return 2; + } + }) + .test() + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java index 369840cbc7..e185dea68d 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java @@ -1429,4 +1429,24 @@ public Object apply(Object[] a) throws Exception { assertEquals(0, counter.get()); } + + @SuppressWarnings("unchecked") + @Test + public void observableSourcesInIterable() { + ObservableSource source = new ObservableSource() { + @Override + public void subscribe(Observer observer) { + Observable.just(1).subscribe(observer); + } + }; + + Observable.zip(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] t) throws Throwable { + return 2; + } + }) + .test() + .assertResult(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java index bb333f45f2..a6caec6171 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java @@ -342,4 +342,19 @@ public void accept(Object v, Throwable e) throws Exception { assertFalse("Interrupted!", interrupted.get()); } } + + @SuppressWarnings("unchecked") + @Test + public void singleSourcesInIterable() { + SingleSource source = new SingleSource() { + @Override + public void subscribe(SingleObserver observer) { + Single.just(1).subscribe(observer); + } + }; + + Single.amb(Arrays.asList(source, source)) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipIterableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipIterableTest.java index 4b49394242..1a94e23257 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipIterableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipIterableTest.java @@ -245,4 +245,24 @@ public void singleSourceZipperReturnsNull() { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value"); } + + @SuppressWarnings("unchecked") + @Test + public void singleSourcesInIterable() { + SingleSource source = new SingleSource() { + @Override + public void subscribe(SingleObserver observer) { + Single.just(1).subscribe(observer); + } + }; + + Single.zip(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] t) throws Throwable { + return 2; + } + }) + .test() + .assertResult(2); + } }