From 85d2cf5e282848f0a39536cfc5d62fd75a80de93 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 28 Jan 2020 13:32:02 +0100 Subject: [PATCH 1/2] 3.x: Use more appropriate operators when delegating to Flowable ops --- .../java/io/reactivex/rxjava3/core/Maybe.java | 20 ++----- .../io/reactivex/rxjava3/core/Single.java | 47 +++++++--------- .../FlowableFlatMapMaybePublisher.java | 49 +++++++++++++++++ .../FlowableFlatMapSinglePublisher.java | 49 +++++++++++++++++ .../FlowableConcatMapMaybePublisher.java | 55 +++++++++++++++++++ .../FlowableConcatMapSinglePublisher.java | 55 +++++++++++++++++++ .../mixed/ObservableConcatMapSingle.java | 4 +- .../io/reactivex/rxjava3/maybe/MaybeTest.java | 2 - 8 files changed, 237 insertions(+), 44 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapMaybePublisher.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapSinglePublisher.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybePublisher.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSinglePublisher.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 40902efa53..ef0efeebc2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -344,11 +344,10 @@ public static Flowable concat(@NonNull Publisher<@NonNull ? extends Maybe @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable concat(@NonNull Publisher<@NonNull ? extends MaybeSource> sources, int prefetch) { Objects.requireNonNull(sources, "sources is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); - return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, MaybeToPublisher.instance(), prefetch, ErrorMode.IMMEDIATE)); + return RxJavaPlugins.onAssembly(new FlowableConcatMapMaybePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch)); } /** @@ -1141,7 +1140,7 @@ public static Maybe fromRunnable(@NonNull Runnable run) { @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static Flowable merge(@NonNull Iterable<@NonNull ? extends MaybeSource> sources) { - return merge(Flowable.fromIterable(sources)); + return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), false, Integer.MAX_VALUE); } /** @@ -1218,11 +1217,10 @@ public static Flowable merge(@NonNull Publisher<@NonNull ? extends MaybeS @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable merge(@NonNull Publisher<@NonNull ? extends MaybeSource> sources, int maxConcurrency) { Objects.requireNonNull(sources, "sources is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); - return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, 1)); + return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybePublisher<>(sources, Functions.identity(), false, maxConcurrency)); } /** @@ -1490,7 +1488,6 @@ public static Flowable mergeArray(MaybeSource... sources) { * @throws NullPointerException if {@code sources} is {@code null} * @see ReactiveX operators documentation: Merge */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -1498,10 +1495,7 @@ public static Flowable mergeArray(MaybeSource... sources) { @NonNull public static Flowable mergeArrayDelayError(@NonNull MaybeSource... sources) { Objects.requireNonNull(sources, "sources is null"); - if (sources.length == 0) { - return Flowable.empty(); - } - return Flowable.fromArray(sources).flatMap((Function)MaybeToPublisher.instance(), true, sources.length); + return Flowable.fromArray(sources).flatMapMaybe(Functions.identity(), true, Math.max(1, sources.length)); } /** @@ -1533,13 +1527,12 @@ public static Flowable mergeArrayDelayError(@NonNull MaybeSourceReactiveX operators documentation: Merge */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static Flowable mergeDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource> sources) { - return Flowable.fromIterable(sources).flatMap((Function)MaybeToPublisher.instance(), true); + return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), true, Integer.MAX_VALUE); } /** @@ -1609,7 +1602,6 @@ public static Flowable mergeDelayError(@NonNull Publisher<@NonNull ? exte * @see ReactiveX operators documentation: Merge * @since 2.2 */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @NonNull @@ -1617,7 +1609,7 @@ public static Flowable mergeDelayError(@NonNull Publisher<@NonNull ? exte public static Flowable mergeDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource> sources, int maxConcurrency) { Objects.requireNonNull(sources, "sources is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); - return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), true, maxConcurrency, 1)); + return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybePublisher<>(sources, Functions.identity(), true, maxConcurrency)); } /** diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 5911000d28..7dc017d45c 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -20,7 +20,6 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; -import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; @@ -32,7 +31,7 @@ import io.reactivex.rxjava3.internal.operators.flowable.*; import io.reactivex.rxjava3.internal.operators.maybe.*; import io.reactivex.rxjava3.internal.operators.mixed.*; -import io.reactivex.rxjava3.internal.operators.observable.*; +import io.reactivex.rxjava3.internal.operators.observable.ObservableSingleSingle; import io.reactivex.rxjava3.internal.operators.single.*; import io.reactivex.rxjava3.internal.util.ErrorMode; import io.reactivex.rxjava3.observers.TestObserver; @@ -195,7 +194,7 @@ public static Single ambArray(@NonNull SingleSource... sourc @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) public static Flowable concat(@NonNull Iterable<@NonNull ? extends SingleSource> sources) { - return concat(Flowable.fromIterable(sources)); + return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity(), false); } /** @@ -216,10 +215,9 @@ public static Flowable concat(@NonNull Iterable<@NonNull ? extends Single @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) public static Observable concat(@NonNull ObservableSource> sources) { Objects.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE)); + return RxJavaPlugins.onAssembly(new ObservableConcatMapSingle<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, 2)); } /** @@ -272,11 +270,10 @@ public static Flowable concat(@NonNull Publisher<@NonNull ? extends Singl @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable concat(@NonNull Publisher<@NonNull ? extends SingleSource> sources, int prefetch) { Objects.requireNonNull(sources, "sources is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); - return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE)); + return RxJavaPlugins.onAssembly(new FlowableConcatMapSinglePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch)); } /** @@ -308,7 +305,7 @@ public static Flowable concat( ) { Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); - return concat(Flowable.fromArray(source1, source2)); + return Flowable.fromArray(source1, source2).concatMapSingleDelayError(Functions.identity(), false); } /** @@ -344,7 +341,7 @@ public static Flowable concat( Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); - return concat(Flowable.fromArray(source1, source2, source3)); + return Flowable.fromArray(source1, source2, source3).concatMapSingleDelayError(Functions.identity(), false); } /** @@ -383,7 +380,7 @@ public static Flowable concat( Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); Objects.requireNonNull(source4, "source4 is null"); - return concat(Flowable.fromArray(source1, source2, source3, source4)); + return Flowable.fromArray(source1, source2, source3, source4).concatMapSingleDelayError(Functions.identity(), false); } /** @@ -409,7 +406,7 @@ public static Flowable concat( @SchedulerSupport(SchedulerSupport.NONE) @SafeVarargs public static Flowable concatArray(@NonNull SingleSource... sources) { - return Flowable.fromArray(sources).concatMap(SingleInternalHelper.toFlowable(), 2); + return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), false); } /** @@ -435,7 +432,7 @@ public static Flowable concatArray(@NonNull SingleSource... @SchedulerSupport(SchedulerSupport.NONE) @SafeVarargs public static Flowable concatArrayDelayError(@NonNull SingleSource... sources) { - return Flowable.fromArray(sources).concatMapDelayError(SingleInternalHelper.toFlowable(), true, 2); + return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), true); } /** @@ -1091,7 +1088,7 @@ public static Single fromObservable(@NonNull ObservableSource Flowable merge(@NonNull Iterable<@NonNull ? extends SingleSource> sources) { - return merge(Flowable.fromIterable(sources)); + return Flowable.fromIterable(sources).flatMapSingle(Functions.identity()); } /** @@ -1129,10 +1126,9 @@ public static Flowable merge(@NonNull Iterable<@NonNull ? extends SingleS @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable merge(@NonNull Publisher<@NonNull ? extends SingleSource> sources) { Objects.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize())); + return RxJavaPlugins.onAssembly(new FlowableFlatMapSinglePublisher<>(sources, Functions.identity(), false, Integer.MAX_VALUE)); } /** @@ -1212,7 +1208,7 @@ public static Flowable merge( ) { Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); - return merge(Flowable.fromArray(source1, source2)); + return Flowable.fromArray(source1, source2).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE); } /** @@ -1265,7 +1261,7 @@ public static Flowable merge( Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); - return merge(Flowable.fromArray(source1, source2, source3)); + return Flowable.fromArray(source1, source2, source3).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE); } /** @@ -1321,7 +1317,7 @@ public static Flowable merge( Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); Objects.requireNonNull(source4, "source4 is null"); - return merge(Flowable.fromArray(source1, source2, source3, source4)); + return Flowable.fromArray(source1, source2, source3, source4).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE); } /** @@ -1360,7 +1356,7 @@ public static Flowable merge( @SchedulerSupport(SchedulerSupport.NONE) @SafeVarargs public static Flowable mergeArray(SingleSource... sources) { - return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, sources.length); + return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, Math.max(1, sources.length)); } /** @@ -1396,7 +1392,7 @@ public static Flowable mergeArray(SingleSource... sources) { @SafeVarargs @NonNull public static Flowable mergeArrayDelayError(@NonNull SingleSource... sources) { - return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, sources.length); + return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, Math.max(1, sources.length)); } /** @@ -1423,7 +1419,7 @@ public static Flowable mergeArrayDelayError(@NonNull SingleSource Flowable mergeDelayError(@NonNull Iterable<@NonNull ? extends SingleSource> sources) { - return mergeDelayError(Flowable.fromIterable(sources)); + return Flowable.fromIterable(sources).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE); } /** @@ -1449,10 +1445,9 @@ public static Flowable mergeDelayError(@NonNull Iterable<@NonNull ? exten @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable mergeDelayError(@NonNull Publisher<@NonNull ? extends SingleSource> sources) { Objects.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize())); + return RxJavaPlugins.onAssembly(new FlowableFlatMapSinglePublisher<>(sources, Functions.identity(), true, Integer.MAX_VALUE)); } /** @@ -1490,7 +1485,7 @@ public static Flowable mergeDelayError( ) { Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); - return mergeDelayError(Flowable.fromArray(source1, source2)); + return Flowable.fromArray(source1, source2).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE); } /** @@ -1532,7 +1527,7 @@ public static Flowable mergeDelayError( Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); - return mergeDelayError(Flowable.fromArray(source1, source2, source3)); + return Flowable.fromArray(source1, source2, source3).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE); } /** @@ -1577,7 +1572,7 @@ public static Flowable mergeDelayError( Objects.requireNonNull(source2, "source2 is null"); Objects.requireNonNull(source3, "source3 is null"); Objects.requireNonNull(source4, "source4 is null"); - return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4)); + return Flowable.fromArray(source1, source2, source3, source4).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE); } /** diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapMaybePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapMaybePublisher.java new file mode 100644 index 0000000000..f300607052 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapMaybePublisher.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMapMaybe.FlatMapMaybeSubscriber; + +/** + * Maps upstream values into MaybeSources and merges their signals into one sequence. + * @param the source value type + * @param the result value type + */ +public final class FlowableFlatMapMaybePublisher extends Flowable { + + final Publisher source; + + final Function> mapper; + + final boolean delayErrors; + + final int maxConcurrency; + + public FlowableFlatMapMaybePublisher(Publisher source, Function> mapper, + boolean delayError, int maxConcurrency) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayError; + this.maxConcurrency = maxConcurrency; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlatMapMaybeSubscriber<>(s, mapper, delayErrors, maxConcurrency)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapSinglePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapSinglePublisher.java new file mode 100644 index 0000000000..457a728523 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapSinglePublisher.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMapSingle.FlatMapSingleSubscriber; + +/** + * Maps upstream values into SingleSources and merges their signals into one sequence. + * @param the source value type + * @param the result value type + */ +public final class FlowableFlatMapSinglePublisher extends Flowable { + + final Publisher source; + + final Function> mapper; + + final boolean delayErrors; + + final int maxConcurrency; + + public FlowableFlatMapSinglePublisher(Publisher source, Function> mapper, + boolean delayError, int maxConcurrency) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayError; + this.maxConcurrency = maxConcurrency; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlatMapSingleSubscriber<>(s, mapper, delayErrors, maxConcurrency)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybePublisher.java new file mode 100644 index 0000000000..8651cdcb60 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybePublisher.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapMaybe.ConcatMapMaybeSubscriber; +import io.reactivex.rxjava3.internal.util.ErrorMode; + +/** + * Maps each upstream item into a {@link MaybeSource}, subscribes to them one after the other terminates + * and relays their success values, optionally delaying any errors till the main and inner sources + * terminate. + *

History: 2.1.11 - experimental + * @param the upstream element type + * @param the output element type + * @since 2.2 + */ +public final class FlowableConcatMapMaybePublisher extends Flowable { + + final Publisher source; + + final Function> mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public FlowableConcatMapMaybePublisher(Publisher source, + Function> mapper, + ErrorMode errorMode, int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new ConcatMapMaybeSubscriber<>(s, mapper, prefetch, errorMode)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSinglePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSinglePublisher.java new file mode 100644 index 0000000000..5b34314d81 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSinglePublisher.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber; +import io.reactivex.rxjava3.internal.util.ErrorMode; + +/** + * Maps each upstream item into a {@link SingleSource}, subscribes to them one after the other terminates + * and relays their success values, optionally delaying any errors till the main and inner sources + * terminate. + *

History: 2.1.11 - experimental + * @param the upstream element type + * @param the output element type + * @since 2.2 + */ +public final class FlowableConcatMapSinglePublisher extends Flowable { + + final Publisher source; + + final Function> mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public FlowableConcatMapSinglePublisher(Publisher source, + Function> mapper, + ErrorMode errorMode, int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new ConcatMapSingleSubscriber<>(s, mapper, prefetch, errorMode)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java index 5fc6b8a520..70d913cd42 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java @@ -36,7 +36,7 @@ */ public final class ObservableConcatMapSingle extends Observable { - final Observable source; + final ObservableSource source; final Function> mapper; @@ -44,7 +44,7 @@ public final class ObservableConcatMapSingle extends Observable { final int prefetch; - public ObservableConcatMapSingle(Observable source, + public ObservableConcatMapSingle(ObservableSource source, Function> mapper, ErrorMode errorMode, int prefetch) { this.source = source; diff --git a/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java b/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java index 94cf62d641..401bdcc918 100644 --- a/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java @@ -2514,8 +2514,6 @@ public void mergeArrayDelayError() { Maybe.mergeArrayDelayError(Maybe.error(new TestException()), Maybe.empty(), Maybe.just(1)) .test() .assertFailure(TestException.class, 1); - - assertSame(Flowable.empty(), Maybe.mergeArrayDelayError()); } @Test From f9c3ef06698dcb4ee4deafdb59d2541d7ca6a258 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 28 Jan 2020 13:58:50 +0100 Subject: [PATCH 2/2] Remove now-unused classes --- .../flowable/FlowableConcatMapPublisher.java | 49 ------------------- .../flowable/FlowableFlatMapPublisher.java | 45 ----------------- .../single/SingleInternalHelper.java | 16 ------ .../single/SingleInternalHelperTest.java | 6 --- 4 files changed, 116 deletions(-) delete mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapPublisher.java delete mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapPublisher.java diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapPublisher.java deleted file mode 100644 index 8ee6afcd87..0000000000 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapPublisher.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ -package io.reactivex.rxjava3.internal.operators.flowable; - -import org.reactivestreams.*; - -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.functions.Function; -import io.reactivex.rxjava3.internal.util.ErrorMode; - -public final class FlowableConcatMapPublisher extends Flowable { - - final Publisher source; - - final Function> mapper; - - final int prefetch; - - final ErrorMode errorMode; - - public FlowableConcatMapPublisher(Publisher source, - Function> mapper, - int prefetch, ErrorMode errorMode) { - this.source = source; - this.mapper = mapper; - this.prefetch = prefetch; - this.errorMode = errorMode; - } - - @Override - protected void subscribeActual(Subscriber s) { - - if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) { - return; - } - - source.subscribe(FlowableConcatMap.subscribe(s, mapper, prefetch, errorMode)); - } -} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapPublisher.java deleted file mode 100644 index 0f5be8917d..0000000000 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapPublisher.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.rxjava3.internal.operators.flowable; - -import org.reactivestreams.*; - -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.functions.Function; - -public final class FlowableFlatMapPublisher extends Flowable { - final Publisher source; - final Function> mapper; - final boolean delayErrors; - final int maxConcurrency; - final int bufferSize; - - public FlowableFlatMapPublisher(Publisher source, - Function> mapper, - boolean delayErrors, int maxConcurrency, int bufferSize) { - this.source = source; - this.mapper = mapper; - this.delayErrors = delayErrors; - this.maxConcurrency = maxConcurrency; - this.bufferSize = bufferSize; - } - - @Override - protected void subscribeActual(Subscriber s) { - if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) { - return; - } - source.subscribe(FlowableFlatMap.subscribe(s, mapper, delayErrors, maxConcurrency, bufferSize)); - } -} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelper.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelper.java index 95ed46144c..47f8bf1e88 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelper.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelper.java @@ -19,7 +19,6 @@ import org.reactivestreams.Publisher; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.functions.*; /** @@ -105,19 +104,4 @@ public Iterator> iterator() { public static Iterable> iterableToFlowable(final Iterable> sources) { return new ToFlowableIterable<>(sources); } - - @SuppressWarnings("rawtypes") - enum ToObservable implements Function { - INSTANCE; - @SuppressWarnings("unchecked") - @Override - public Observable apply(SingleSource v) { - return new SingleToObservable(v); - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Function, Observable> toObservable() { - return (Function)ToObservable.INSTANCE; - } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelperTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelperTest.java index e551b882d1..5d5e152c1b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelperTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleInternalHelperTest.java @@ -41,12 +41,6 @@ public void toFlowableEnum() { assertNotNull(SingleInternalHelper.ToFlowable.valueOf("INSTANCE")); } - @Test - public void toObservableEnum() { - assertEquals(1, SingleInternalHelper.ToObservable.values().length); - assertNotNull(SingleInternalHelper.ToObservable.valueOf("INSTANCE")); - } - @Test public void singleIterableToFlowableIterable() { Iterable> it = SingleInternalHelper.iterableToFlowable(