From d690f3c503098610e69663af3087487ee8cc0567 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2019 13:59:42 +0100 Subject: [PATCH 1/6] 3.x: Fix parallel() on grouped flowable not replenishing properly --- .../operators/flowable/FlowableGroupBy.java | 20 +++++++++---- .../flowable/FlowableGroupByTest.java | 30 +++++++++++++++++++ .../flowable/FlowableObserveOnTest.java | 17 +++++++++++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java index 60b9437f05..6b76558a5e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java @@ -743,6 +743,20 @@ public T poll() { produced++; return v; } + tryReplenish(); + return null; + } + + @Override + public boolean isEmpty() { + if (queue.isEmpty()) { + tryReplenish(); + return true; + } + return false; + } + + void tryReplenish() { int p = produced; if (p != 0) { produced = 0; @@ -750,12 +764,6 @@ public T poll() { parent.upstream.request(p); } } - return null; - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); } @Override diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java index 825c11354e..f5bf461dda 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java @@ -2413,4 +2413,34 @@ public void run() { } } } + + @Test + public void fusedParallelGroupProcessing() { + Flowable.range(0, 500000) + .subscribeOn(Schedulers.single()) + .groupBy(new Function() { + @Override + public Integer apply(Integer i) throws Throwable { + return i % 2; + } + }) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) { + return g.getKey() == 0 + ? g + .parallel() + .runOn(Schedulers.computation()) + .map(Functions.identity()) + .sequential() + : g.map(Functions.identity()) // no need to use hide + ; + } + }) + .test() + .awaitDone(20, TimeUnit.SECONDS) + .assertValueCount(500000) + .assertComplete() + .assertNoErrors(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java index 1c8763b33f..30c60a3ed8 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java @@ -29,6 +29,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.*; import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.flowables.GroupedFlowable; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.internal.fuseable.*; @@ -1972,4 +1973,20 @@ public void fusedNoConcurrentCleanDueToCancel() { } } } + + @Test + public void fusedParallelProcessing() { + Flowable.range(0, 500000) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()) + .parallel() + .runOn(Schedulers.computation()) + .map(Functions.identity()) + .sequential() + .test() + .awaitDone(20, TimeUnit.SECONDS) + .assertValueCount(500000) + .assertComplete() + .assertNoErrors(); + } } From 72e2f4e9ea067157a4c2ddbb566ccbcf7955e0eb Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2019 14:00:53 +0100 Subject: [PATCH 2/6] Remove accidental import --- .../internal/operators/flowable/FlowableObserveOnTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java index 30c60a3ed8..624ecee97a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java @@ -29,7 +29,6 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.*; import io.reactivex.rxjava3.exceptions.*; -import io.reactivex.rxjava3.flowables.GroupedFlowable; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.internal.fuseable.*; From 155d24f83c7aecfa8b6b7ecbf79b01d880dede82 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2019 14:10:29 +0100 Subject: [PATCH 3/6] Avoid calling `isEmpty` --- .../parallel/ParallelFromPublisher.java | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java index 126f7cdf13..5005f5e611 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java @@ -268,21 +268,9 @@ void drainAsync() { } } - boolean empty = q.isEmpty(); - - if (d && empty) { - for (Subscriber s : a) { - s.onComplete(); - } - return; - } - - if (empty) { - break; - } - long requestAtIndex = r.get(idx); long emissionAtIndex = e[idx]; + boolean empty = false; if (requestAtIndex != emissionAtIndex && r.get(n + idx) == 0) { T v; @@ -298,24 +286,34 @@ void drainAsync() { return; } - if (v == null) { - break; - } + empty = v == null; + if (!empty) { + a[idx].onNext(v); - a[idx].onNext(v); - - e[idx] = emissionAtIndex + 1; + e[idx] = emissionAtIndex + 1; - int c = ++consumed; - if (c == limit) { - consumed = 0; - upstream.request(c); + int c = ++consumed; + if (c == limit) { + consumed = 0; + upstream.request(c); + } + notReady = 0; } - notReady = 0; } else { notReady++; } + if (d && empty) { + for (Subscriber s : a) { + s.onComplete(); + } + return; + } + + if (empty) { + break; + } + idx++; if (idx == n) { idx = 0; From e7cbc7b1e47f18f326623ca59c5c96e2a2eb6b3d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2019 14:16:19 +0100 Subject: [PATCH 4/6] Undo some of the parallel changes --- .../parallel/ParallelFromPublisher.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java index 5005f5e611..22ffabc994 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java @@ -268,9 +268,20 @@ void drainAsync() { } } + boolean empty = q.isEmpty(); + if (d && empty) { + for (Subscriber s : a) { + s.onComplete(); + } + return; + } + + if (empty) { + break; + } + long requestAtIndex = r.get(idx); long emissionAtIndex = e[idx]; - boolean empty = false; if (requestAtIndex != emissionAtIndex && r.get(n + idx) == 0) { T v; @@ -286,8 +297,7 @@ void drainAsync() { return; } - empty = v == null; - if (!empty) { + if (v != null) { a[idx].onNext(v); e[idx] = emissionAtIndex + 1; @@ -303,17 +313,6 @@ void drainAsync() { notReady++; } - if (d && empty) { - for (Subscriber s : a) { - s.onComplete(); - } - return; - } - - if (empty) { - break; - } - idx++; if (idx == n) { idx = 0; From db4c78b6652241d22bc03c8180d24423e237ef21 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2019 14:23:46 +0100 Subject: [PATCH 5/6] Undo all changes to ParallelFromPublisher --- .../parallel/ParallelFromPublisher.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java index 22ffabc994..d5b832c28c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java @@ -269,6 +269,7 @@ void drainAsync() { } boolean empty = q.isEmpty(); + if (d && empty) { for (Subscriber s : a) { s.onComplete(); @@ -297,18 +298,20 @@ void drainAsync() { return; } - if (v != null) { - a[idx].onNext(v); + if (v == null) { + break; + } - e[idx] = emissionAtIndex + 1; + a[idx].onNext(v); - int c = ++consumed; - if (c == limit) { - consumed = 0; - upstream.request(c); - } - notReady = 0; + e[idx] = emissionAtIndex + 1; + + int c = ++consumed; + if (c == limit) { + consumed = 0; + upstream.request(c); } + notReady = 0; } else { notReady++; } @@ -434,4 +437,4 @@ void drain() { } } } -} +} \ No newline at end of file From 7c7b02806b9365c36f3f15aaa0c1fd9b4e6d97ea Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2019 14:24:16 +0100 Subject: [PATCH 6/6] Again, undo --- .../internal/operators/parallel/ParallelFromPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java index d5b832c28c..126f7cdf13 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java @@ -437,4 +437,4 @@ void drain() { } } } -} \ No newline at end of file +}