diff --git a/src/main/java/io/reactivex/rxjava3/disposables/AutoCloseableDisposable.java b/src/main/java/io/reactivex/rxjava3/disposables/AutoCloseableDisposable.java index 8c0f2be2dc..08cbe9cd99 100644 --- a/src/main/java/io/reactivex/rxjava3/disposables/AutoCloseableDisposable.java +++ b/src/main/java/io/reactivex/rxjava3/disposables/AutoCloseableDisposable.java @@ -14,7 +14,7 @@ package io.reactivex.rxjava3.disposables; import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.internal.util.ExceptionHelper; /** * A disposable container that manages an {@link AutoCloseable} instance. @@ -33,7 +33,7 @@ protected void onDisposed(@NonNull AutoCloseable value) { try { value.close(); } catch (Throwable ex) { - RxJavaPlugins.onError(ex); + throw ExceptionHelper.wrapOrThrow(ex); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java index c967a011e5..b01b24748a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java @@ -21,7 +21,7 @@ import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.fuseable.*; import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; @@ -61,6 +61,7 @@ protected void subscribeActual(Subscriber s) { stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream"); } } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); EmptySubscription.error(ex, s); return; } @@ -243,6 +244,7 @@ void drain() { try { t = queue.poll(); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); trySignalError(downstream, ex); continue; } @@ -271,6 +273,7 @@ else if (!isEmpty) { iterator = null; } } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); trySignalError(downstream, ex); } continue; @@ -282,6 +285,7 @@ else if (!isEmpty) { try { item = Objects.requireNonNull(iterator.next(), "The Stream.Iterator returned a null value"); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); trySignalError(downstream, ex); continue; } @@ -297,6 +301,7 @@ else if (!isEmpty) { clearCurrentRethrowCloseError(); } } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); trySignalError(downstream, ex); } } @@ -328,6 +333,7 @@ void clearCurrentSuppressCloseError() { try { clearCurrentRethrowCloseError(); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); RxJavaPlugins.onError(ex); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableFlatMapStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableFlatMapStream.java index 9b7b446e28..968e581625 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableFlatMapStream.java +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableFlatMapStream.java @@ -55,6 +55,7 @@ protected void subscribeActual(Observer observer) { stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream"); } } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); EmptyDisposable.error(ex, observer); return; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollect.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollect.java index 80345f2f3b..c2560cf6e3 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollect.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollect.java @@ -39,6 +39,7 @@ protected void subscribeActual(Subscriber s) { try { u = Objects.requireNonNull(initialSupplier.get(), "The initial value supplied is null"); } catch (Throwable e) { + Exceptions.throwIfFatal(e); EmptySubscription.error(e, s); return; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollectSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollectSingle.java index fcb5590b49..edf17d95ef 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollectSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollectSingle.java @@ -44,6 +44,7 @@ protected void subscribeActual(SingleObserver observer) { try { u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value"); } catch (Throwable e) { + Exceptions.throwIfFatal(e); EmptyDisposable.error(e, observer); return; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnEach.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnEach.java index b6900b077e..74d74f41cf 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnEach.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnEach.java @@ -159,6 +159,7 @@ public T poll() throws Throwable { try { onError.accept(ex); } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); throw new CompositeException(ex, exc); } throw ExceptionHelper.throwIfThrowable(ex); @@ -173,6 +174,7 @@ public T poll() throws Throwable { try { onError.accept(ex); } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); throw new CompositeException(ex, exc); } throw ExceptionHelper.throwIfThrowable(ex); @@ -314,6 +316,7 @@ public T poll() throws Throwable { try { onError.accept(ex); } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); throw new CompositeException(ex, exc); } throw ExceptionHelper.throwIfThrowable(ex); @@ -328,6 +331,7 @@ public T poll() throws Throwable { try { onError.accept(ex); } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); throw new CompositeException(ex, exc); } throw ExceptionHelper.throwIfThrowable(ex); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java index 8d9e2e8460..6a352e2fc4 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java @@ -209,6 +209,7 @@ public void connect(Consumer connection) { try { connection.accept(ps); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); if (doConnect) { ps.shouldConnect.compareAndSet(true, false); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScalarXMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScalarXMap.java index 2ef89c0023..d78e83a3d2 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScalarXMap.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScalarXMap.java @@ -136,6 +136,7 @@ public void subscribeActual(Subscriber s) { try { other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null Publisher"); } catch (Throwable e) { + Exceptions.throwIfFatal(e); EmptySubscription.error(e, s); return; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java index 4e38351ef7..d55198010a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -245,6 +245,7 @@ void drain() { try { endSource = Objects.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null Publisher"); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); upstream.cancel(); startSubscriber.cancel(); resources.dispose(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipIterable.java index a41c591fe2..1fb9777a50 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipIterable.java @@ -101,7 +101,7 @@ public void onNext(T t) { try { u = Objects.requireNonNull(iterator.next(), "The iterator returned a null value"); } catch (Throwable e) { - error(e); + fail(e); return; } @@ -109,7 +109,7 @@ public void onNext(T t) { try { v = Objects.requireNonNull(zipper.apply(t, u), "The zipper function returned a null value"); } catch (Throwable e) { - error(e); + fail(e); return; } @@ -120,7 +120,7 @@ public void onNext(T t) { try { b = iterator.hasNext(); } catch (Throwable e) { - error(e); + fail(e); return; } @@ -131,7 +131,7 @@ public void onNext(T t) { } } - void error(Throwable e) { + void fail(Throwable e) { Exceptions.throwIfFatal(e); done = true; upstream.cancel(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java index 5d4dbc0f0b..acc93b3ea6 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java @@ -52,6 +52,7 @@ protected void subscribeActual(MaybeObserver observer) { v = future.get(timeout, unit); } } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); if (ex instanceof ExecutionException) { ex = ex.getCause(); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBuffer.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBuffer.java index 2b09748a61..bd7f4b1baa 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBuffer.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBuffer.java @@ -186,6 +186,7 @@ public void onNext(T t) { try { b = ExceptionHelper.nullCheck(bufferSupplier.get(), "The bufferSupplier returned a null Collection."); } catch (Throwable e) { + Exceptions.throwIfFatal(e); buffers.clear(); upstream.dispose(); downstream.onError(e); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollect.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollect.java index 6471d7538c..76b03eddb6 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollect.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollect.java @@ -14,6 +14,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.disposables.*; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -37,6 +38,7 @@ protected void subscribeActual(Observer t) { try { u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value"); } catch (Throwable e) { + Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); return; } @@ -86,6 +88,7 @@ public void onNext(T t) { try { collector.accept(u, t); } catch (Throwable e) { + Exceptions.throwIfFatal(e); upstream.dispose(); onError(e); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollectSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollectSingle.java index 09c86cb24a..07db8d5bf3 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollectSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollectSingle.java @@ -14,6 +14,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.disposables.*; import io.reactivex.rxjava3.internal.fuseable.FuseToObservable; @@ -41,6 +42,7 @@ protected void subscribeActual(SingleObserver t) { try { u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value"); } catch (Throwable e) { + Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); return; } @@ -94,6 +96,7 @@ public void onNext(T t) { try { collector.accept(u, t); } catch (Throwable e) { + Exceptions.throwIfFatal(e); upstream.dispose(); onError(e); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java index ba0811b85c..b6a945e8b7 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java @@ -205,6 +205,7 @@ public void connect(Consumer connection) { try { connection.accept(ps); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); if (doConnect) { ps.shouldConnect.compareAndSet(true, false); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableScalarXMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableScalarXMap.java index 128d327e2b..f4a1bce25b 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableScalarXMap.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableScalarXMap.java @@ -140,6 +140,7 @@ public void subscribeActual(Observer observer) { try { other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource"); } catch (Throwable e) { + Exceptions.throwIfFatal(e); EmptyDisposable.error(e, observer); return; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java index 5ea42da118..8722d6df0c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -237,6 +237,7 @@ void drain() { try { endSource = Objects.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null ObservableSource"); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); upstream.dispose(); startObserver.dispose(); resources.dispose(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTask.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTask.java index 678c82e794..5df8793ebb 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTask.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTask.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -56,6 +57,7 @@ public Void call() throws Exception { setRest(executor.submit(this)); runner = null; } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); runner = null; RxJavaPlugins.onError(ex); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java index 695760f2ca..ac30532e68 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java @@ -16,6 +16,7 @@ package io.reactivex.rxjava3.internal.schedulers; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.plugins.RxJavaPlugins; /** @@ -38,6 +39,7 @@ public void run() { runnable.run(); runner = null; } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); runner = null; lazySet(FINISHED); RxJavaPlugins.onError(ex); diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java index cc82349cfc..392a3c3051 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java @@ -20,6 +20,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; /** @@ -108,6 +109,7 @@ static int getIntProperty(boolean enabled, String key, int defaultNotFound, int } return Integer.parseInt(value); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); return defaultNotFound; } } @@ -123,6 +125,7 @@ static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNo } return "true".equals(value); } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); return defaultNotFound; } } diff --git a/src/test/java/io/reactivex/rxjava3/disposables/DisposableTest.java b/src/test/java/io/reactivex/rxjava3/disposables/DisposableTest.java index 66abc6a280..b966a1db0f 100644 --- a/src/test/java/io/reactivex/rxjava3/disposables/DisposableTest.java +++ b/src/test/java/io/reactivex/rxjava3/disposables/DisposableTest.java @@ -213,17 +213,20 @@ public void fromAutoCloseableThrows() throws Throwable { assertTrue(errors.isEmpty()); - d.dispose(); + try { + d.dispose(); + fail("Should have thrown!"); + } catch (TestException expected) { + // expected + } assertTrue(d.isDisposed()); - assertEquals(1, errors.size()); d.dispose(); assertTrue(d.isDisposed()); - assertEquals(1, errors.size()); - TestHelper.assertUndeliverable(errors, 0, TestException.class); + assertTrue(errors.isEmpty()); }); } diff --git a/src/test/java/io/reactivex/rxjava3/validators/CatchThrowIfFatalCheck.java b/src/test/java/io/reactivex/rxjava3/validators/CatchThrowIfFatalCheck.java new file mode 100644 index 0000000000..5b78a2da0a --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/validators/CatchThrowIfFatalCheck.java @@ -0,0 +1,96 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.validators; + +import java.io.File; +import java.nio.file.Files; +import java.util.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.testsupport.TestHelper; + +/** + * Check if a {@code catch(Throwable} is followed by a + * {@code Exceptions.throwIfFatal}, {@code Exceptions.wrapOrThrow} + * or {@code fail} call. + * @since 3.0.0 + */ +public class CatchThrowIfFatalCheck { + + @Test + public void check() throws Exception { + File f = TestHelper.findSource("Flowable"); + if (f == null) { + System.out.println("Unable to find sources of RxJava"); + return; + } + + Queue dirs = new ArrayDeque<>(); + + StringBuilder fail = new StringBuilder(); + int errors = 0; + + File parent = f.getParentFile().getParentFile(); + + dirs.offer(new File(parent.getAbsolutePath().replace('\\', '/'))); + + while (!dirs.isEmpty()) { + f = dirs.poll(); + + File[] list = f.listFiles(); + if (list != null && list.length != 0) { + + for (File u : list) { + if (u.isDirectory()) { + dirs.offer(u); + } else { + List lines = Files.readAllLines(u.toPath()); + + for (int i = 0; i < lines.size(); i++) { + String line = lines.get(i).trim(); + + if (line.startsWith("} catch (Throwable ")) { + String next = lines.get(i + 1).trim(); + boolean throwIfFatal = next.contains("Exceptions.throwIfFatal"); + boolean wrapOrThrow = next.contains("ExceptionHelper.wrapOrThrow"); + boolean failCall = next.startsWith("fail("); + + if (!(throwIfFatal || wrapOrThrow || failCall)) { + errors++; + fail.append("Missing Exceptions.throwIfFatal\n ") + .append(next) + .append("\n at ") + .append(u.getName().replace(".java", "")) + .append(".method(") + .append(u.getName()) + .append(":") + .append(i + 1) + .append(")\n") + ; + } + } + } + } + } + } + } + + if (errors != 0) { + fail.insert(0, "Found " + errors + " cases\n"); + System.out.println(fail); + throw new AssertionError(fail.toString()); + } + } +}