From bfc2775a83c21fe356bd546a16139b2465a29efc Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 21 Aug 2019 14:51:20 +0200 Subject: [PATCH 1/4] 3.x: Fix blockingIterable not unblocking when force-disposed --- .../flowable/BlockingFlowableIterable.java | 14 +++++++-- .../BlockingObservableIterable.java | 10 ++++++- .../BlockingFlowableToIteratorTest.java | 28 ++++++++++++++++++ .../BlockingObservableNextTest.java | 7 ++--- .../BlockingObservableToIteratorTest.java | 29 ++++++++++++++++++- 5 files changed, 79 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java index 811ced15f3..1457d0fbda 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java @@ -21,7 +21,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.*; @@ -62,7 +62,7 @@ static final class BlockingFlowableIterator long produced; volatile boolean done; - Throwable error; + volatile Throwable error; BlockingFlowableIterator(int batchSize) { this.queue = new SpscArrayQueue(batchSize); @@ -75,6 +75,13 @@ static final class BlockingFlowableIterator @Override public boolean hasNext() { for (;;) { + if (isDisposed()) { + Throwable e = error; + if (e != null) { + throw ExceptionHelper.wrapOrThrow(e); + } + return false; + } boolean d = done; boolean empty = queue.isEmpty(); if (d) { @@ -90,7 +97,7 @@ public boolean hasNext() { BlockingHelper.verifyNonBlocking(); lock.lock(); try { - while (!done && queue.isEmpty()) { + while (!done && queue.isEmpty() && !isDisposed()) { condition.await(); } } catch (InterruptedException ex) { @@ -175,6 +182,7 @@ public void remove() { @Override public void dispose() { SubscriptionHelper.cancel(this); + signalConsumer(); // just in case it is currently blocking in hasNext } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java index b190896f2b..e92a09e4e0 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java @@ -64,6 +64,13 @@ static final class BlockingObservableIterator @Override public boolean hasNext() { for (;;) { + if (isDisposed()) { + Throwable e = error; + if (e != null) { + throw ExceptionHelper.wrapOrThrow(e); + } + return false; + } boolean d = done; boolean empty = queue.isEmpty(); if (d) { @@ -80,7 +87,7 @@ public boolean hasNext() { BlockingHelper.verifyNonBlocking(); lock.lock(); try { - while (!done && queue.isEmpty()) { + while (!done && queue.isEmpty() && !isDisposed()) { condition.await(); } } finally { @@ -146,6 +153,7 @@ public void remove() { @Override public void dispose() { DisposableHelper.dispose(this); + signalConsumer(); // just in case it is currently blocking in hasNext } @Override diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java index abb445c149..20bfcf9962 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java @@ -16,14 +16,18 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.internal.operators.flowable.BlockingFlowableIterable.BlockingFlowableIterator; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; public class BlockingFlowableToIteratorTest extends RxJavaTest { @@ -163,4 +167,28 @@ protected void subscribeActual(Subscriber s) { it.next(); } + + @Test(expected = NoSuchElementException.class) + public void disposedIteratorHasNextReturns() { + Iterator it = PublishProcessor.create() + .blockingIterable().iterator(); + ((Disposable)it).dispose(); + assertFalse(it.hasNext()); + it.next(); + } + + @Test + public void asyncDisposeUnblocks() { + final Iterator it = PublishProcessor.create() + .blockingIterable().iterator(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + ((Disposable)it).dispose(); + } + }, 1, TimeUnit.SECONDS); + + assertFalse(it.hasNext()); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java index 938f757a0c..194619b8bc 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java @@ -28,7 +28,6 @@ import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.internal.operators.observable.BlockingObservableNext.NextObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.processors.BehaviorProcessor; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.*; import io.reactivex.rxjava3.testsupport.TestHelper; @@ -333,9 +332,9 @@ public void singleSourceManyIterators() throws InterruptedException { @Test public void synchronousNext() { - assertEquals(1, BehaviorProcessor.createDefault(1).take(1).blockingSingle().intValue()); - assertEquals(2, BehaviorProcessor.createDefault(2).blockingIterable().iterator().next().intValue()); - assertEquals(3, BehaviorProcessor.createDefault(3).blockingNext().iterator().next().intValue()); + assertEquals(1, BehaviorSubject.createDefault(1).take(1).blockingSingle().intValue()); + assertEquals(2, BehaviorSubject.createDefault(2).blockingIterable().iterator().next().intValue()); + assertEquals(3, BehaviorSubject.createDefault(3).blockingNext().iterator().next().intValue()); } @Test diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java index d904ed30dd..3a98208d70 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java @@ -16,15 +16,18 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; -import io.reactivex.rxjava3.disposables.Disposables; +import io.reactivex.rxjava3.disposables.*; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.internal.operators.observable.BlockingObservableIterable.BlockingObservableIterator; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.PublishSubject; public class BlockingObservableToIteratorTest extends RxJavaTest { @@ -104,4 +107,28 @@ public void remove() { BlockingObservableIterator it = new BlockingObservableIterator(128); it.remove(); } + + @Test(expected = NoSuchElementException.class) + public void disposedIteratorHasNextReturns() { + Iterator it = PublishSubject.create() + .blockingIterable().iterator(); + ((Disposable)it).dispose(); + assertFalse(it.hasNext()); + it.next(); + } + + @Test + public void asyncDisposeUnblocks() { + final Iterator it = PublishSubject.create() + .blockingIterable().iterator(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + ((Disposable)it).dispose(); + } + }, 1, TimeUnit.SECONDS); + + assertFalse(it.hasNext()); + } } From 6ddfde18874d8ea025c7bc00093e2083fe65f29a Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 21 Aug 2019 15:18:10 +0200 Subject: [PATCH 2/4] Update src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java Co-Authored-By: Niklas Baudy --- .../internal/operators/flowable/BlockingFlowableIterable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java index 1457d0fbda..151541f63f 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java @@ -182,7 +182,7 @@ public void remove() { @Override public void dispose() { SubscriptionHelper.cancel(this); - signalConsumer(); // just in case it is currently blocking in hasNext + signalConsumer(); // Just in case it is currently blocking in hasNext. } @Override From e52a5b9657b785edeaa7f94ac5a63e000abca135 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 21 Aug 2019 15:18:19 +0200 Subject: [PATCH 3/4] Update src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java Co-Authored-By: Niklas Baudy --- .../operators/observable/BlockingObservableIterable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java index e92a09e4e0..2699a9d8b4 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java @@ -153,7 +153,7 @@ public void remove() { @Override public void dispose() { DisposableHelper.dispose(this); - signalConsumer(); // just in case it is currently blocking in hasNext + signalConsumer(); // Just in case it is currently blocking in hasNext. } @Override From cc72bd5e30116755b719c0d10234dd08d1ae2b27 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 21 Aug 2019 15:25:30 +0200 Subject: [PATCH 4/4] missed a volatile --- .../operators/observable/BlockingObservableIterable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java index 2699a9d8b4..0ba2da1720 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java @@ -53,7 +53,7 @@ static final class BlockingObservableIterator final Condition condition; volatile boolean done; - Throwable error; + volatile Throwable error; BlockingObservableIterator(int batchSize) { this.queue = new SpscLinkedArrayQueue(batchSize);