From 5e6267fde8189d7fc3af1e6aa9dba48c14cbe1b4 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 11 Nov 2019 13:59:29 +0100 Subject: [PATCH] 3.x: Fix concurrent clear in observeOn while output-fused --- .../operators/flowable/FlowableObserveOn.java | 2 +- .../observable/ObservableObserveOn.java | 2 +- .../flowable/FlowableObserveOnTest.java | 34 +++++++++++++++++++ .../observable/ObservableObserveOnTest.java | 34 ++++++++++++++++++- 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java index 6fe64c3ff8..66605c9520 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java @@ -154,7 +154,7 @@ public final void cancel() { upstream.cancel(); worker.dispose(); - if (getAndIncrement() == 0) { + if (!outputFused && getAndIncrement() == 0) { queue.clear(); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java index f36325e8d0..9bc2b39117 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java @@ -145,7 +145,7 @@ public void dispose() { disposed = true; upstream.dispose(); worker.dispose(); - if (getAndIncrement() == 0) { + if (!outputFused && getAndIncrement() == 0) { queue.clear(); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java index ce81203a84..1c8763b33f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java @@ -35,6 +35,7 @@ import io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn.BaseObserveOnSubscriber; import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.processors.*; import io.reactivex.rxjava3.schedulers.*; @@ -1938,4 +1939,37 @@ public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() { assertEquals(1, s.disposedCount.get()); } + + @Test + public void fusedNoConcurrentCleanDueToCancel() { + for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) { + List errors = TestHelper.trackPluginErrors(); + try { + final UnicastProcessor up = UnicastProcessor.create(); + + TestObserver to = up.hide() + .observeOn(Schedulers.io()) + .observeOn(Schedulers.single()) + .unsubscribeOn(Schedulers.computation()) + .firstOrError() + .test(); + + for (int i = 0; up.hasSubscribers() && i < 10000; i++) { + up.onNext(i); + } + + to + .awaitDone(5, TimeUnit.SECONDS) + ; + + if (!errors.isEmpty()) { + throw new CompositeException(errors); + } + + to.assertResult(0); + } finally { + RxJavaPlugins.reset(); + } + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOnTest.java index c51d36f0d0..0602249ceb 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOnTest.java @@ -29,7 +29,7 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.*; -import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.fuseable.*; import io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOnTest.DisposeTrackingScheduler; @@ -815,4 +815,36 @@ public void workerNotDisposedPrematurelyNormalInAsyncOut() { assertEquals(1, s.disposedCount.get()); } + @Test + public void fusedNoConcurrentCleanDueToCancel() { + for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) { + List errors = TestHelper.trackPluginErrors(); + try { + final UnicastSubject us = UnicastSubject.create(); + + TestObserver to = us.hide() + .observeOn(Schedulers.io()) + .observeOn(Schedulers.single()) + .unsubscribeOn(Schedulers.computation()) + .firstOrError() + .test(); + + for (int i = 0; us.hasObservers() && i < 10000; i++) { + us.onNext(i); + } + + to + .awaitDone(5, TimeUnit.SECONDS) + ; + + if (!errors.isEmpty()) { + throw new CompositeException(errors); + } + + to.assertResult(0); + } finally { + RxJavaPlugins.reset(); + } + } + } }