Skip to content

Commit c98416b

Browse files
authored
3.x: Fix blockingIterable not unblocking when force-disposed (#6626)
* 3.x: Fix blockingIterable not unblocking when force-disposed * Update src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java Co-Authored-By: Niklas Baudy <[email protected]> * Update src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java Co-Authored-By: Niklas Baudy <[email protected]> * missed a volatile
1 parent 3318b1b commit c98416b

File tree

5 files changed

+80
-10
lines changed

5 files changed

+80
-10
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import io.reactivex.rxjava3.core.*;
2323
import io.reactivex.rxjava3.disposables.Disposable;
24-
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
24+
import io.reactivex.rxjava3.exceptions.*;
2525
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
2626
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
2727
import io.reactivex.rxjava3.internal.util.*;
@@ -62,7 +62,7 @@ static final class BlockingFlowableIterator<T>
6262
long produced;
6363

6464
volatile boolean done;
65-
Throwable error;
65+
volatile Throwable error;
6666

6767
BlockingFlowableIterator(int batchSize) {
6868
this.queue = new SpscArrayQueue<T>(batchSize);
@@ -75,6 +75,13 @@ static final class BlockingFlowableIterator<T>
7575
@Override
7676
public boolean hasNext() {
7777
for (;;) {
78+
if (isDisposed()) {
79+
Throwable e = error;
80+
if (e != null) {
81+
throw ExceptionHelper.wrapOrThrow(e);
82+
}
83+
return false;
84+
}
7885
boolean d = done;
7986
boolean empty = queue.isEmpty();
8087
if (d) {
@@ -90,7 +97,7 @@ public boolean hasNext() {
9097
BlockingHelper.verifyNonBlocking();
9198
lock.lock();
9299
try {
93-
while (!done && queue.isEmpty()) {
100+
while (!done && queue.isEmpty() && !isDisposed()) {
94101
condition.await();
95102
}
96103
} catch (InterruptedException ex) {
@@ -175,6 +182,7 @@ public void remove() {
175182
@Override
176183
public void dispose() {
177184
SubscriptionHelper.cancel(this);
185+
signalConsumer(); // Just in case it is currently blocking in hasNext.
178186
}
179187

180188
@Override

src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static final class BlockingObservableIterator<T>
5353
final Condition condition;
5454

5555
volatile boolean done;
56-
Throwable error;
56+
volatile Throwable error;
5757

5858
BlockingObservableIterator(int batchSize) {
5959
this.queue = new SpscLinkedArrayQueue<T>(batchSize);
@@ -64,6 +64,13 @@ static final class BlockingObservableIterator<T>
6464
@Override
6565
public boolean hasNext() {
6666
for (;;) {
67+
if (isDisposed()) {
68+
Throwable e = error;
69+
if (e != null) {
70+
throw ExceptionHelper.wrapOrThrow(e);
71+
}
72+
return false;
73+
}
6774
boolean d = done;
6875
boolean empty = queue.isEmpty();
6976
if (d) {
@@ -80,7 +87,7 @@ public boolean hasNext() {
8087
BlockingHelper.verifyNonBlocking();
8188
lock.lock();
8289
try {
83-
while (!done && queue.isEmpty()) {
90+
while (!done && queue.isEmpty() && !isDisposed()) {
8491
condition.await();
8592
}
8693
} finally {
@@ -146,6 +153,7 @@ public void remove() {
146153
@Override
147154
public void dispose() {
148155
DisposableHelper.dispose(this);
156+
signalConsumer(); // Just in case it is currently blocking in hasNext.
149157
}
150158

151159
@Override

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
1920

2021
import org.junit.Test;
2122
import org.reactivestreams.*;
2223

2324
import io.reactivex.rxjava3.core.*;
25+
import io.reactivex.rxjava3.disposables.Disposable;
2426
import io.reactivex.rxjava3.exceptions.*;
2527
import io.reactivex.rxjava3.internal.operators.flowable.BlockingFlowableIterable.BlockingFlowableIterator;
2628
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
29+
import io.reactivex.rxjava3.processors.PublishProcessor;
30+
import io.reactivex.rxjava3.schedulers.Schedulers;
2731

2832
public class BlockingFlowableToIteratorTest extends RxJavaTest {
2933

@@ -163,4 +167,28 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
163167

164168
it.next();
165169
}
170+
171+
@Test(expected = NoSuchElementException.class)
172+
public void disposedIteratorHasNextReturns() {
173+
Iterator<Integer> it = PublishProcessor.<Integer>create()
174+
.blockingIterable().iterator();
175+
((Disposable)it).dispose();
176+
assertFalse(it.hasNext());
177+
it.next();
178+
}
179+
180+
@Test
181+
public void asyncDisposeUnblocks() {
182+
final Iterator<Integer> it = PublishProcessor.<Integer>create()
183+
.blockingIterable().iterator();
184+
185+
Schedulers.single().scheduleDirect(new Runnable() {
186+
@Override
187+
public void run() {
188+
((Disposable)it).dispose();
189+
}
190+
}, 1, TimeUnit.SECONDS);
191+
192+
assertFalse(it.hasNext());
193+
}
166194
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.reactivex.rxjava3.exceptions.TestException;
2929
import io.reactivex.rxjava3.internal.operators.observable.BlockingObservableNext.NextObserver;
3030
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
31-
import io.reactivex.rxjava3.processors.BehaviorProcessor;
3231
import io.reactivex.rxjava3.schedulers.Schedulers;
3332
import io.reactivex.rxjava3.subjects.*;
3433
import io.reactivex.rxjava3.testsupport.TestHelper;
@@ -333,9 +332,9 @@ public void singleSourceManyIterators() throws InterruptedException {
333332

334333
@Test
335334
public void synchronousNext() {
336-
assertEquals(1, BehaviorProcessor.createDefault(1).take(1).blockingSingle().intValue());
337-
assertEquals(2, BehaviorProcessor.createDefault(2).blockingIterable().iterator().next().intValue());
338-
assertEquals(3, BehaviorProcessor.createDefault(3).blockingNext().iterator().next().intValue());
335+
assertEquals(1, BehaviorSubject.createDefault(1).take(1).blockingSingle().intValue());
336+
assertEquals(2, BehaviorSubject.createDefault(2).blockingIterable().iterator().next().intValue());
337+
assertEquals(3, BehaviorSubject.createDefault(3).blockingNext().iterator().next().intValue());
339338
}
340339

341340
@Test

src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
1920

2021
import org.junit.Test;
2122

2223
import io.reactivex.rxjava3.core.*;
2324
import io.reactivex.rxjava3.core.Observable;
2425
import io.reactivex.rxjava3.core.Observer;
25-
import io.reactivex.rxjava3.disposables.Disposables;
26+
import io.reactivex.rxjava3.disposables.*;
2627
import io.reactivex.rxjava3.exceptions.TestException;
2728
import io.reactivex.rxjava3.internal.operators.observable.BlockingObservableIterable.BlockingObservableIterator;
29+
import io.reactivex.rxjava3.schedulers.Schedulers;
30+
import io.reactivex.rxjava3.subjects.PublishSubject;
2831

2932
public class BlockingObservableToIteratorTest extends RxJavaTest {
3033

@@ -104,4 +107,28 @@ public void remove() {
104107
BlockingObservableIterator<Integer> it = new BlockingObservableIterator<Integer>(128);
105108
it.remove();
106109
}
110+
111+
@Test(expected = NoSuchElementException.class)
112+
public void disposedIteratorHasNextReturns() {
113+
Iterator<Integer> it = PublishSubject.<Integer>create()
114+
.blockingIterable().iterator();
115+
((Disposable)it).dispose();
116+
assertFalse(it.hasNext());
117+
it.next();
118+
}
119+
120+
@Test
121+
public void asyncDisposeUnblocks() {
122+
final Iterator<Integer> it = PublishSubject.<Integer>create()
123+
.blockingIterable().iterator();
124+
125+
Schedulers.single().scheduleDirect(new Runnable() {
126+
@Override
127+
public void run() {
128+
((Disposable)it).dispose();
129+
}
130+
}, 1, TimeUnit.SECONDS);
131+
132+
assertFalse(it.hasNext());
133+
}
107134
}

0 commit comments

Comments
 (0)