Skip to content

3.x: [Java 8] Add AutoCloseable <-> Disposable conversions, nicen docs #6780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.reactivex.rxjava3.internal.util.ExceptionHelper;

/**
* A Disposable container that manages an Action instance.
* A Disposable container that manages an {@link Action} instance.
*/
final class ActionDisposable extends ReferenceDisposable<Action> {

Expand All @@ -35,4 +35,9 @@ protected void onDisposed(@NonNull Action value) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}

@Override
public String toString() {
return "ActionDisposable(disposed=" + isDisposed() + ", " + get() + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.disposables;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* A disposable container that manages an {@link AutoCloseable} instance.
* @since 3.0.0
*/
final class AutoCloseableDisposable extends ReferenceDisposable<AutoCloseable> {

private static final long serialVersionUID = -6646144244598696847L;

AutoCloseableDisposable(AutoCloseable value) {
super(value);
}

@Override
protected void onDisposed(@NonNull AutoCloseable value) {
try {
value.close();
} catch (Throwable ex) {
RxJavaPlugins.onError(ex);
}
}

@Override
public String toString() {
return "AutoCloseableDisposable(disposed=" + isDisposed() + ", " + get() + ")";
}

}
58 changes: 43 additions & 15 deletions src/main/java/io/reactivex/rxjava3/disposables/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private Disposables() {
}

/**
* Construct a Disposable by wrapping a Runnable that is
* executed exactly once when the Disposable is disposed.
* Construct a {@code Disposable} by wrapping a {@link Runnable} that is
* executed exactly once when the {@code Disposable} is disposed.
* @param run the Runnable to wrap
* @return the new Disposable instance
*/
Expand All @@ -47,8 +47,8 @@ public static Disposable fromRunnable(@NonNull Runnable run) {
}

/**
* Construct a Disposable by wrapping a Action that is
* executed exactly once when the Disposable is disposed.
* Construct a {@code Disposable} by wrapping a {@link Action} that is
* executed exactly once when the {@code Disposable} is disposed.
* @param run the Action to wrap
* @return the new Disposable instance
*/
Expand All @@ -59,10 +59,13 @@ public static Disposable fromAction(@NonNull Action run) {
}

/**
* Construct a Disposable by wrapping a Future that is
* cancelled exactly once when the Disposable is disposed.
* Construct a {@code Disposable} by wrapping a {@link Future} that is
* cancelled exactly once when the {@code Disposable} is disposed.
* <p>
* The {@code Future} is cancelled with {@code mayInterruptIfRunning == true}.
* @param future the Future to wrap
* @return the new Disposable instance
* @see #fromFuture(Future, boolean)
*/
@NonNull
public static Disposable fromFuture(@NonNull Future<?> future) {
Expand All @@ -71,10 +74,10 @@ public static Disposable fromFuture(@NonNull Future<?> future) {
}

/**
* Construct a Disposable by wrapping a Future that is
* cancelled exactly once when the Disposable is disposed.
* Construct a {@code Disposable} by wrapping a {@link Future} that is
* cancelled exactly once when the {@code Disposable} is disposed.
* @param future the Future to wrap
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
* @param allowInterrupt if true, the future cancel happens via {@code Future.cancel(true)}
* @return the new Disposable instance
*/
@NonNull
Expand All @@ -84,8 +87,8 @@ public static Disposable fromFuture(@NonNull Future<?> future, boolean allowInte
}

/**
* Construct a Disposable by wrapping a Subscription that is
* cancelled exactly once when the Disposable is disposed.
* Construct a {@code Disposable} by wrapping a {@link Subscription} that is
* cancelled exactly once when the {@code Disposable} is disposed.
* @param subscription the Runnable to wrap
* @return the new Disposable instance
*/
Expand All @@ -96,17 +99,42 @@ public static Disposable fromSubscription(@NonNull Subscription subscription) {
}

/**
* Returns a new, non-disposed Disposable instance.
* @return a new, non-disposed Disposable instance
* Construct a {@code Disposable} by wrapping an {@link AutoCloseable} that is
* closed exactly once when the {@code Disposable} is disposed.
* @param autoCloseable the AutoCloseable to wrap
* @return the new Disposable instance
* @since 3.0.0
*/
@NonNull
public static Disposable fromAutoCloseable(@NonNull AutoCloseable autoCloseable) {
Objects.requireNonNull(autoCloseable, "autoCloseable is null");
return new AutoCloseableDisposable(autoCloseable);
}

/**
* Construct an {@link AutoCloseable} by wrapping a {@code Disposable} that is
* disposed when the returned {@code AutoCloseable} is closed.
* @param disposable the Disposable instance
* @return the new AutoCloseable instance
* @since 3.0.0
*/
@NonNull
public static AutoCloseable toAutoCloseable(@NonNull Disposable disposable) {
return disposable::dispose;
}

/**
* Returns a new, non-disposed {@code Disposable} instance.
* @return a new, non-disposed {@code Disposable} instance
*/
@NonNull
public static Disposable empty() {
return fromRunnable(Functions.EMPTY_RUNNABLE);
}

/**
* Returns a disposed Disposable instance.
* @return a disposed Disposable instance
* Returns a shared, disposed {@code Disposable} instance.
* @return a shared, disposed {@code Disposable} instance
*/
@NonNull
public static Disposable disposed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.concurrent.atomic.AtomicReference;

/**
* A Disposable container that cancels a Future instance.
* A Disposable container that cancels a {@link Future} instance.
*/
final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.reactivex.rxjava3.annotations.NonNull;

/**
* A disposable container that manages a Runnable instance.
* A disposable container that manages a {@link Runnable} instance.
*/
final class RunnableDisposable extends ReferenceDisposable<Runnable> {

Expand Down
111 changes: 95 additions & 16 deletions src/test/java/io/reactivex/rxjava3/disposables/DisposablesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.reactivestreams.Subscription;

import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand All @@ -34,11 +35,19 @@ public class DisposablesTest extends RxJavaTest {

@Test
public void unsubscribeOnlyOnce() {
Runnable dispose = mock(Runnable.class);
Disposable subscription = Disposables.fromRunnable(dispose);
subscription.dispose();
subscription.dispose();
verify(dispose, times(1)).run();
Runnable run = mock(Runnable.class);

Disposable d = Disposables.fromRunnable(run);

assertTrue(d.toString(), d.toString().contains("RunnableDisposable(disposed=false, "));

d.dispose();
assertTrue(d.toString(), d.toString().contains("RunnableDisposable(disposed=true, "));

d.dispose();
assertTrue(d.toString(), d.toString().contains("RunnableDisposable(disposed=true, "));

verify(run, times(1)).run();
}

@Test
Expand All @@ -61,22 +70,20 @@ public void utilityClass() {
}

@Test
public void fromAction() {
class AtomicAction extends AtomicBoolean implements Action {
public void fromAction() throws Throwable {
Action action = mock(Action.class);

private static final long serialVersionUID = -1517510584253657229L;
Disposable d = Disposables.fromAction(action);

@Override
public void run() throws Exception {
set(true);
}
}
assertTrue(d.toString(), d.toString().contains("ActionDisposable(disposed=false, "));

AtomicAction aa = new AtomicAction();
d.dispose();
assertTrue(d.toString(), d.toString().contains("ActionDisposable(disposed=true, "));

Disposables.fromAction(aa).dispose();
d.dispose();
assertTrue(d.toString(), d.toString().contains("ActionDisposable(disposed=true, "));

assertTrue(aa.get());
verify(action, times(1)).run();
}

@Test
Expand Down Expand Up @@ -174,4 +181,76 @@ public void setOnceTwice() {
RxJavaPlugins.reset();
}
}

@Test
public void fromAutoCloseable() {
AtomicInteger counter = new AtomicInteger();

AutoCloseable ac = () -> counter.getAndIncrement();

Disposable d = Disposables.fromAutoCloseable(ac);

assertFalse(d.isDisposed());
assertEquals(0, counter.get());
assertTrue(d.toString(), d.toString().contains("AutoCloseableDisposable(disposed=false, "));

d.dispose();

assertTrue(d.isDisposed());
assertEquals(1, counter.get());
assertTrue(d.toString(), d.toString().contains("AutoCloseableDisposable(disposed=true, "));

d.dispose();

assertTrue(d.isDisposed());
assertEquals(1, counter.get());
assertTrue(d.toString(), d.toString().contains("AutoCloseableDisposable(disposed=true, "));
}

@Test
public void fromAutoCloseableThrows() throws Throwable {
TestHelper.withErrorTracking(errors -> {
AutoCloseable ac = () -> { throw new TestException(); };

Disposable d = Disposables.fromAutoCloseable(ac);

assertFalse(d.isDisposed());

assertTrue(errors.isEmpty());

d.dispose();

assertTrue(d.isDisposed());
assertEquals(1, errors.size());

d.dispose();

assertTrue(d.isDisposed());
assertEquals(1, errors.size());

TestHelper.assertUndeliverable(errors, 0, TestException.class);
});
}

@Test
public void toAutoCloseable() throws Exception {
AtomicInteger counter = new AtomicInteger();

Disposable d = Disposables.fromAction(() -> counter.getAndIncrement());

AutoCloseable ac = Disposables.toAutoCloseable(d);

assertFalse(d.isDisposed());
assertEquals(0, counter.get());

ac.close();

assertTrue(d.isDisposed());
assertEquals(1, counter.get());

ac.close();

assertTrue(d.isDisposed());
assertEquals(1, counter.get());
}
}