diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 785c7e7d53..de94b4f93b 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -11212,52 +11212,6 @@ public final Flowable lift(FlowableOperator lifte return RxJavaPlugins.onAssembly(new FlowableLift(this, lifter)); } - /** - * Limits both the number of upstream items (after which the sequence completes) - * and the total downstream request amount requested from the upstream to - * possibly prevent the creation of excess items by the upstream. - *

- * The operator requests at most the given {@code count} of items from upstream even - * if the downstream requests more than that. For example, given a {@code limit(5)}, - * if the downstream requests 1, a request of 1 is submitted to the upstream - * and the operator remembers that only 4 items can be requested now on. A request - * of 5 at this point will request 4 from the upstream and any subsequent requests will - * be ignored. - *

- * Note that requests are negotiated on an operator boundary and {@code limit}'s amount - * may not be preserved further upstream. For example, - * {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the - * default (128) elements from the given {@code source}. - *

- * The main use of this operator is with sources that are async boundaries that - * don't interfere with request amounts, such as certain {@code Flowable}-based - * network endpoints that relay downstream request amounts unchanged and are, therefore, - * prone to trigger excessive item creation/transmission over the network. - *

- *
Backpressure:
- *
The operator requests a total of the given {@code count} items from the upstream.
- *
Scheduler:
- *
{@code limit} does not operate by default on a particular {@link Scheduler}.
- *
- *

History: 2.1.6 - experimental - * @param count the maximum number of items and the total request amount, non-negative. - * Zero will immediately cancel the upstream on subscription and complete - * the downstream. - * @return the new Flowable instance - * @see #take(long) - * @see #rebatchRequests(int) - * @since 2.2 - */ - @BackpressureSupport(BackpressureKind.SPECIAL) - @SchedulerSupport(SchedulerSupport.NONE) - @CheckReturnValue - public final Flowable limit(long count) { - if (count < 0) { - throw new IllegalArgumentException("count >= 0 required but it was " + count); - } - return RxJavaPlugins.onAssembly(new FlowableLimit(this, count)); - } - /** * Returns a Flowable that applies a specified function to each item emitted by the source Publisher and * emits the results of these function applications. @@ -15372,6 +15326,7 @@ public final Flowable switchMapSingleDelayError(@NonNull Function(this, mapper, true)); } + /** * Returns a Flowable that emits only the first {@code count} items emitted by the source Publisher. If the source emits fewer than * {@code count} items then all of its items are emitted. @@ -15381,23 +15336,39 @@ public final Flowable switchMapSingleDelayError(@NonNull Function + * Limits both the number of upstream items (after which the sequence completes) + * and the total downstream request amount requested from the upstream to + * possibly prevent the creation of excess items by the upstream. + *

+ * The operator requests at most the given {@code count} of items from upstream even + * if the downstream requests more than that. For example, given a {@code limit(5)}, + * if the downstream requests 1, a request of 1 is submitted to the upstream + * and the operator remembers that only 4 items can be requested now on. A request + * of 5 at this point will request 4 from the upstream and any subsequent requests will + * be ignored. + *

+ * Note that requests are negotiated on an operator boundary and {@code limit}'s amount + * may not be preserved further upstream. For example, + * {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the + * default (128) elements from the given {@code source}. *

*
Backpressure:
- *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure - * behavior in case the first request is smaller than the {@code count}. Otherwise, the source {@code Publisher} - * is consumed in an unbounded manner (i.e., without applying backpressure to it).
+ *
The source {@code Publisher} is consumed in a bounded manner.
*
Scheduler:
*
This version of {@code take} does not operate by default on a particular {@link Scheduler}.
*
* * @param count - * the maximum number of items to emit + * the maximum number of items and the total request amount, non-negative. + * Zero will immediately cancel the upstream on subscription and complete + * the downstream. * @return a Flowable that emits only the first {@code count} items emitted by the source Publisher, or * all of the items from the source Publisher if that Publisher emits fewer than {@code count} items * @see ReactiveX operators documentation: Take */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable take(long count) { if (count < 0) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimit.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimit.java deleted file mode 100644 index 7adef9983e..0000000000 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimit.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.rxjava3.internal.operators.flowable; - -import java.util.concurrent.atomic.AtomicLong; - -import org.reactivestreams.*; - -import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.internal.subscriptions.*; -import io.reactivex.rxjava3.plugins.RxJavaPlugins; - -/** - * Limits both the total request amount and items received from the upstream. - *

History: 2.1.6 - experimental - * @param the source and output value type - * @since 2.2 - */ -public final class FlowableLimit extends AbstractFlowableWithUpstream { - - final long n; - - public FlowableLimit(Flowable source, long n) { - super(source); - this.n = n; - } - - @Override - protected void subscribeActual(Subscriber s) { - source.subscribe(new LimitSubscriber(s, n)); - } - - static final class LimitSubscriber - extends AtomicLong - implements FlowableSubscriber, Subscription { - - private static final long serialVersionUID = 2288246011222124525L; - - final Subscriber downstream; - - long remaining; - - Subscription upstream; - - LimitSubscriber(Subscriber actual, long remaining) { - this.downstream = actual; - this.remaining = remaining; - lazySet(remaining); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.upstream, s)) { - if (remaining == 0L) { - s.cancel(); - EmptySubscription.complete(downstream); - } else { - this.upstream = s; - downstream.onSubscribe(this); - } - } - } - - @Override - public void onNext(T t) { - long r = remaining; - if (r > 0L) { - remaining = --r; - downstream.onNext(t); - if (r == 0L) { - upstream.cancel(); - downstream.onComplete(); - } - } - } - - @Override - public void onError(Throwable t) { - if (remaining > 0L) { - remaining = 0L; - downstream.onError(t); - } else { - RxJavaPlugins.onError(t); - } - } - - @Override - public void onComplete() { - if (remaining > 0L) { - remaining = 0L; - downstream.onComplete(); - } - } - - @Override - public void request(long n) { - if (SubscriptionHelper.validate(n)) { - for (;;) { - long r = get(); - if (r == 0L) { - break; - } - long toRequest; - if (r <= n) { - toRequest = r; - } else { - toRequest = n; - } - long u = r - toRequest; - if (compareAndSet(r, u)) { - upstream.request(toRequest); - break; - } - } - } - } - - @Override - public void cancel() { - upstream.cancel(); - } - - } -} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java index dc07922aa7..9b8e995322 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java @@ -13,7 +13,7 @@ package io.reactivex.rxjava3.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.*; @@ -22,46 +22,45 @@ import io.reactivex.rxjava3.plugins.RxJavaPlugins; public final class FlowableTake extends AbstractFlowableWithUpstream { - final long limit; - public FlowableTake(Flowable source, long limit) { + + final long n; + + public FlowableTake(Flowable source, long n) { super(source); - this.limit = limit; + this.n = n; } @Override protected void subscribeActual(Subscriber s) { - source.subscribe(new TakeSubscriber(s, limit)); + source.subscribe(new TakeSubscriber(s, n)); } - static final class TakeSubscriber extends AtomicBoolean implements FlowableSubscriber, Subscription { + static final class TakeSubscriber + extends AtomicLong + implements FlowableSubscriber, Subscription { - private static final long serialVersionUID = -5636543848937116287L; + private static final long serialVersionUID = 2288246011222124525L; final Subscriber downstream; - final long limit; - - boolean done; + long remaining; Subscription upstream; - long remaining; - - TakeSubscriber(Subscriber actual, long limit) { + TakeSubscriber(Subscriber actual, long remaining) { this.downstream = actual; - this.limit = limit; - this.remaining = limit; + this.remaining = remaining; + lazySet(remaining); } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { - upstream = s; - if (limit == 0L) { + if (remaining == 0L) { s.cancel(); - done = true; EmptySubscription.complete(downstream); } else { + this.upstream = s; downstream.onSubscribe(this); } } @@ -69,21 +68,21 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (!done && remaining-- > 0) { - boolean stop = remaining == 0; + long r = remaining; + if (r > 0L) { + remaining = --r; downstream.onNext(t); - if (stop) { + if (r == 0L) { upstream.cancel(); - onComplete(); + downstream.onComplete(); } } } @Override public void onError(Throwable t) { - if (!done) { - done = true; - upstream.cancel(); + if (remaining > 0L) { + remaining = 0L; downstream.onError(t); } else { RxJavaPlugins.onError(t); @@ -92,29 +91,39 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (!done) { - done = true; + if (remaining > 0L) { + remaining = 0L; downstream.onComplete(); } } @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - if (!get() && compareAndSet(false, true)) { - if (n >= limit) { - upstream.request(Long.MAX_VALUE); - return; + if (SubscriptionHelper.validate(n)) { + for (;;) { + long r = get(); + if (r == 0L) { + break; + } + long toRequest; + if (r <= n) { + toRequest = r; + } else { + toRequest = n; + } + long u = r - toRequest; + if (compareAndSet(r, u)) { + upstream.request(toRequest); + break; + } } } - upstream.request(n); } @Override public void cancel() { upstream.cancel(); } + } } diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java index ed3f8f4cdc..32c954f42b 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java @@ -331,8 +331,7 @@ public void cancel() { } }).take(2).subscribe(ts); - // FIXME the take now requests Long.MAX_PATH if downstream requests at least the limit - assertEquals(Long.MAX_VALUE, requested.get()); + assertEquals(2, requested.get()); } @Test diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java index bcd53605fc..781de93fb0 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java @@ -2047,30 +2047,6 @@ public void openCloseTake() { ts.assertResult(Collections.emptyList()); } - @Test - @SuppressWarnings("unchecked") - public void openCloseLimit() { - PublishProcessor source = PublishProcessor.create(); - - PublishProcessor openIndicator = PublishProcessor.create(); - - PublishProcessor closeIndicator = PublishProcessor.create(); - - TestSubscriber> ts = source - .buffer(openIndicator, Functions.justFunction(closeIndicator)) - .limit(1) - .test(2); - - openIndicator.onNext(1); - closeIndicator.onComplete(); - - assertFalse(source.hasSubscribers()); - assertFalse(openIndicator.hasSubscribers()); - assertFalse(closeIndicator.hasSubscribers()); - - ts.assertResult(Collections.emptyList()); - } - @Test @SuppressWarnings("unchecked") public void openCloseEmptyBackpressure() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithMaybeTest.java index f5dc759890..604daa4950 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithMaybeTest.java @@ -357,7 +357,7 @@ public void drainExactRequestCancel() { final MaybeSubject cs = MaybeSubject.create(); TestSubscriber ts = pp.mergeWith(cs) - .limit(2) + .take(2) .subscribeWith(new TestSubscriber(2) { @Override public void onNext(Integer t) { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithSingleTest.java index b7036b8418..4c5cd09d5b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithSingleTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithSingleTest.java @@ -353,7 +353,7 @@ public void drainExactRequestCancel() { final SingleSubject cs = SingleSubject.create(); TestSubscriber ts = pp.mergeWith(cs) - .limit(2) + .take(2) .subscribeWith(new TestSubscriber(2) { @Override public void onNext(Integer t) { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest.java index 7af0002bf7..2885446438 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest.java @@ -142,6 +142,27 @@ public void subscribe(Subscriber subscriber) { RxJavaPlugins.reset(); } } + + @Test + public void takeEmitsErrors() { + Flowable.error(new TestException()) + .take(1) + .test() + .assertNoValues() + .assertError(TestException.class); + } + + @Test + public void takeRequestOverflow() { + TestSubscriber ts = Flowable.just(1, 2, 3) + .take(3) + .test(0); + ts.requestMore(1) + .assertValues(1) + .assertNotComplete() + .requestMore(Long.MAX_VALUE) + .assertValues(1, 2, 3); + } @Test public void unsubscribeAfterTake() { @@ -305,7 +326,7 @@ public void cancel() { } }).take(3).subscribe(ts); - assertEquals(Long.MAX_VALUE, requested.get()); + assertEquals(3, requested.get()); } @Test @@ -332,7 +353,7 @@ public void cancel() { }).take(1).subscribe(ts); //FIXME take triggers fast path if downstream requests more than the limit - assertEquals(Long.MAX_VALUE, requested.get()); + assertEquals(1, requested.get()); } @Test @@ -383,7 +404,7 @@ public void accept(long n) { ts.awaitDone(5, TimeUnit.SECONDS); ts.assertComplete(); ts.assertNoErrors(); - assertEquals(3, requests.get()); + assertEquals(2, requests.get()); } @Test diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimitTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest2.java similarity index 91% rename from src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimitTest.java rename to src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest2.java index 897bef2a7b..14987ce16a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimitTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest2.java @@ -29,7 +29,8 @@ import io.reactivex.rxjava3.subscribers.TestSubscriber; import io.reactivex.rxjava3.testsupport.TestHelper; -public class FlowableLimitTest extends RxJavaTest implements LongConsumer, Action { +// moved tests from FlowableLimitTest to here (limit removed as operator) +public class FlowableTakeTest2 extends RxJavaTest implements LongConsumer, Action { final List requests = new ArrayList(); @@ -49,7 +50,7 @@ public void run() throws Exception { public void shorterSequence() { Flowable.range(1, 5) .doOnRequest(this) - .limit(6) + .take(6) .test() .assertResult(1, 2, 3, 4, 5); @@ -61,7 +62,7 @@ public void exactSequence() { Flowable.range(1, 5) .doOnRequest(this) .doOnCancel(this) - .limit(5) + .take(5) .test() .assertResult(1, 2, 3, 4, 5); @@ -74,7 +75,7 @@ public void exactSequence() { public void longerSequence() { Flowable.range(1, 6) .doOnRequest(this) - .limit(5) + .take(5) .test() .assertResult(1, 2, 3, 4, 5); @@ -84,17 +85,17 @@ public void longerSequence() { @Test public void error() { Flowable.error(new TestException()) - .limit(5) + .take(5) .test() .assertFailure(TestException.class); } @Test - public void limitZero() { + public void takeZero() { Flowable.range(1, 5) .doOnCancel(this) .doOnRequest(this) - .limit(0) + .take(0) .test() .assertResult(); @@ -103,10 +104,10 @@ public void limitZero() { } @Test - public void limitStep() { + public void takeStep() { TestSubscriber ts = Flowable.range(1, 6) .doOnRequest(this) - .limit(5) + .take(5) .test(0L); assertEquals(0, requests.size()); @@ -124,16 +125,16 @@ public void limitStep() { } @Test - public void limitAndTake() { + public void takeThenTake() { Flowable.range(1, 5) .doOnCancel(this) .doOnRequest(this) - .limit(6) + .take(6) .take(5) .test() .assertResult(1, 2, 3, 4, 5); - assertEquals(Arrays.asList(6L, CANCELLED), requests); + assertEquals(Arrays.asList(5L, CANCELLED), requests); } @Test @@ -142,7 +143,7 @@ public void noOverrequest() { TestSubscriber ts = pp .doOnRequest(this) - .limit(5) + .take(5) .test(0L); ts.request(5); @@ -173,7 +174,7 @@ protected void subscribeActual(Subscriber s) { s.onSubscribe(null); } } - .limit(0) + .take(0) .test() .assertResult(); @@ -186,14 +187,14 @@ protected void subscribeActual(Subscriber s) { @Test public void badRequest() { - TestHelper.assertBadRequestReported(Flowable.range(1, 5).limit(3)); + TestHelper.assertBadRequestReported(Flowable.range(1, 5).take(3)); } @Test public void requestRace() { for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { final TestSubscriber ts = Flowable.range(1, 10) - .limit(5) + .take(5) .test(0L); Runnable r = new Runnable() { @@ -214,7 +215,7 @@ public void errorAfterLimitReached() { List errors = TestHelper.trackPluginErrors(); try { Flowable.error(new TestException()) - .limit(0) + .take(0) .test() .assertResult(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java index ec3d2f7a5b..488a803172 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java @@ -263,7 +263,7 @@ public MaybeSource apply(Integer v) return Maybe.just(v); } }) - .limit(3) + .take(3) .test() .assertResult(1, 2, 3); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java index 3ac974da86..359053cdb5 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java @@ -181,7 +181,7 @@ public SingleSource apply(Integer v) return Single.just(v); } }) - .limit(3) + .take(3) .test() .assertResult(1, 2, 3); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybeTest.java index 5d0c272cc4..1eea34f3fa 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybeTest.java @@ -140,7 +140,7 @@ public MaybeSource apply(Integer v) return Maybe.just(v); } }) - .limit(3) + .take(3) .test() .assertResult(1, 2, 3); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSingleTest.java index bcfe4a1e89..a834f327f9 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSingleTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSingleTest.java @@ -88,7 +88,7 @@ public SingleSource apply(Integer v) return Single.just(v); } }) - .limit(3) + .take(3) .test() .assertResult(1, 2, 3); } diff --git a/src/test/java/io/reactivex/rxjava3/tck/LimitTckTest.java b/src/test/java/io/reactivex/rxjava3/tck/LimitTckTest.java index 44bbc7363f..3fe61b627a 100644 --- a/src/test/java/io/reactivex/rxjava3/tck/LimitTckTest.java +++ b/src/test/java/io/reactivex/rxjava3/tck/LimitTckTest.java @@ -24,7 +24,7 @@ public class LimitTckTest extends BaseTck { @Override public Publisher createPublisher(long elements) { return - Flowable.range(0, (int)elements * 2).limit(elements) + Flowable.range(0, (int)elements * 2).take(elements) ; } } diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index f515064913..d8b5d14f7e 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -190,9 +190,8 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class, Scheduler.class)); - // zero take/limit is allowed + // zero take is allowed addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "take", Long.TYPE)); - addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "limit", Long.TYPE)); // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class));