Skip to content

Commit aeb5f2c

Browse files
authored
2.x: Fix MulticastProcessor not requesting more after limit is reached (#6715)
1 parent 52dee7d commit aeb5f2c

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

src/main/java/io/reactivex/processors/MulticastProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ void drain() {
569569
}
570570
}
571571

572+
consumed = c;
572573
missed = wip.addAndGet(-missed);
573574
if (missed == 0) {
574575
break;

src/test/java/io/reactivex/processors/MulticastProcessorTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,4 +783,41 @@ public void noUpstream() {
783783
assertTrue(mp.hasSubscribers());
784784
}
785785

786+
@Test
787+
public void requestUpstreamPrefetchNonFused() {
788+
for (int j = 1; j < 12; j++) {
789+
MulticastProcessor<Integer> mp = MulticastProcessor.create(j, true);
790+
791+
TestSubscriber<Integer> ts = mp.test(0).withTag("Prefetch: " + j);
792+
793+
Flowable.range(1, 10).hide().subscribe(mp);
794+
795+
ts.assertEmpty()
796+
.requestMore(3)
797+
.assertValuesOnly(1, 2, 3)
798+
.requestMore(3)
799+
.assertValuesOnly(1, 2, 3, 4, 5, 6)
800+
.requestMore(4)
801+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
802+
}
803+
}
804+
805+
@Test
806+
public void requestUpstreamPrefetchNonFused2() {
807+
for (int j = 1; j < 12; j++) {
808+
MulticastProcessor<Integer> mp = MulticastProcessor.create(j, true);
809+
810+
TestSubscriber<Integer> ts = mp.test(0).withTag("Prefetch: " + j);
811+
812+
Flowable.range(1, 10).hide().subscribe(mp);
813+
814+
ts.assertEmpty()
815+
.requestMore(2)
816+
.assertValuesOnly(1, 2)
817+
.requestMore(2)
818+
.assertValuesOnly(1, 2, 3, 4)
819+
.requestMore(6)
820+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
821+
}
822+
}
786823
}

0 commit comments

Comments
 (0)