Skip to content

Commit 48e40a7

Browse files
committed
Basic interoperability between Java8 Streams and Scala collections.
This consists of (1) toScala[Coll] extension methods for Java8 Streams (2) seqStream and parStream extension methods for Scala collections (3) A manually specialized set of Accumulators that let you quickly save a stream for multiple re-use. (a) accumulate and accumulatePrimitive methods on Java8 Streams (b) to[Coll] on Accumulators (c) .iterator, .toArray, .toList also There is a lot of redundant code in the Accumulators since it is difficult to pull out common functionality without endangering performance of the manually specialized cases. Tests written. Scaladocs written for the new classes. No top-level docs; feature is not complete.
1 parent 9253ed9 commit 48e40a7

File tree

7 files changed

+1730
-0
lines changed

7 files changed

+1730
-0
lines changed
Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
package scala.collection.mutable.java8
2+
3+
/** An `Accumulator` is a low-level collection specialized for gathering
4+
* elements in parallel and then joining them in order by merging Accumulators.
5+
* Accumulators can contain more than `Int.MaxValue` elements.
6+
*/
7+
final class Accumulator[A]
8+
extends AccumulatorLike[A, Accumulator[A]]
9+
{ self =>
10+
private[java8] var current: Array[AnyRef] = Accumulator.emptyAnyRefArray
11+
private[java8] var history: Array[Array[AnyRef]] = Accumulator.emptyAnyRefArrayArray
12+
private[java8] var cumul: Array[Long] = Accumulator.emptyLongArray
13+
14+
private[java8] def cumulative(i: Int) = cumul(i)
15+
16+
private def expand(): Unit = {
17+
if (index > 0) {
18+
if (hIndex >= history.length) hExpand()
19+
history(hIndex) = current
20+
cumul(hIndex) = (if (hIndex > 0) cumulative(hIndex-1) else 0) + index
21+
hIndex += 1
22+
}
23+
current = new Array[AnyRef](nextBlockSize)
24+
index = 0
25+
}
26+
27+
private def hExpand(): Unit = {
28+
if (hIndex == 0) {
29+
history = new Array[Array[AnyRef]](4)
30+
cumul = new Array[Long](4)
31+
}
32+
else {
33+
history = java.util.Arrays.copyOf(history, history.length << 1)
34+
cumul = java.util.Arrays.copyOf(cumul, cumul.length << 1)
35+
}
36+
}
37+
38+
/** Appends an element to this `Accumulator`. */
39+
final def +=(a: A): Unit = {
40+
totalSize += 1
41+
if (index >= current.length) expand()
42+
current(index) = a.asInstanceOf[AnyRef]
43+
index += 1
44+
}
45+
46+
/** Removes all elements from `that` and appends them to this `Accumulator`. */
47+
final def drain[A1 <: A](that: Accumulator[A1]): Unit = {
48+
var h = 0
49+
var prev = 0L
50+
var more = true
51+
while (more && h < that.hIndex) {
52+
val n = (that.cumulative(h) - prev).toInt
53+
if (current.length - index >= n) {
54+
System.arraycopy(that.history(h), 0, current, index, n)
55+
prev = that.cumulative(h)
56+
index += n
57+
h += 1
58+
}
59+
else more = false
60+
}
61+
if (h >= that.hIndex && current.length - index >= that.index) {
62+
if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index)
63+
index += that.index
64+
}
65+
else {
66+
val slots = (if (index > 0) 1 else 0) + that.hIndex - h
67+
if (hIndex + slots > history.length) {
68+
val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots)))
69+
history = java.util.Arrays.copyOf(history, n)
70+
cumul = java.util.Arrays.copyOf(cumul, n)
71+
}
72+
var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L)
73+
if (index > 0) {
74+
pv += index
75+
cumul(hIndex) = pv
76+
history(hIndex) = (if (index < (current.length >>> 3) && current.length > 32) java.util.Arrays.copyOf(current, index) else current)
77+
hIndex += 1
78+
}
79+
while (h < that.hIndex) {
80+
pv += that.cumulative(h) - prev
81+
prev = that.cumulative(h)
82+
cumul(hIndex) = pv
83+
history(hIndex) = that.history(h)
84+
h += 1
85+
hIndex += 1
86+
}
87+
index = that.index
88+
current = that.current
89+
}
90+
totalSize += that.totalSize
91+
that.clear
92+
}
93+
94+
override def clear(): Unit = {
95+
super.clear()
96+
current = Accumulator.emptyAnyRefArray
97+
history = Accumulator.emptyAnyRefArrayArray
98+
cumul = Accumulator.emptyLongArray
99+
}
100+
101+
/** Retrieves the `ix`th element. */
102+
final def apply(ix: Long): A = {
103+
if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt).asInstanceOf[A]
104+
else {
105+
val w = seekSlot(ix)
106+
history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt).asInstanceOf[A]
107+
}
108+
}
109+
110+
/** Retrieves the `ix`th element, using an `Int` index. */
111+
final def apply(i: Int): A = apply(i.toLong)
112+
113+
/** Returns an `Iterator` over the contents of this `Accumulator`. */
114+
final def iterator = new Iterator[A] {
115+
private var consumed = 0L
116+
private var hix = 0
117+
private var ix = 0
118+
private var i = 0
119+
private var a: Array[AnyRef] = Accumulator.emptyAnyRefArray
120+
@annotation.tailrec private def loadMore() {
121+
if (hix >= self.hIndex) {
122+
a = self.current
123+
val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1))
124+
ix = self.index
125+
i = (consumed - pv).toInt
126+
}
127+
else {
128+
val ch = self.cumulative(hix)
129+
if (ch > consumed) {
130+
a = self.history(hix)
131+
val pv = (if (hix == 0) 0L else self.cumulative(hix-1))
132+
ix = (ch - pv).toInt
133+
i = (consumed - pv).toInt
134+
}
135+
else {
136+
hix += 1
137+
loadMore()
138+
}
139+
}
140+
}
141+
def hasNext = consumed < self.totalSize
142+
def next = {
143+
if (i >= ix) loadMore()
144+
val ans = a(i)
145+
i += 1
146+
consumed += 1
147+
ans.asInstanceOf[A]
148+
}
149+
}
150+
151+
/** Returns a `java.util.Spliterator` over the contents of this `Accumulator`*/
152+
final def spliterator: java.util.Spliterator[A] = new AccumulatorSpliterator[A](this)
153+
154+
/** Produces a sequential Java 8 Stream over the elements of this `Accumulator`*/
155+
final def seqStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(spliterator, false)
156+
157+
/** Produces a parallel Java 8 Stream over the elements of this `Accumulator`*/
158+
final def parStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(spliterator, true)
159+
160+
/** Copies the elements in this `Accumulator` into an `Array` */
161+
final def toArray(implicit tag: reflect.ClassTag[A]) = {
162+
if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString)
163+
val a = new Array[A](totalSize.toInt)
164+
var j = 0
165+
var h = 0
166+
var pv = 0L
167+
while (h < hIndex) {
168+
val x = history(h)
169+
val n = cumulative(h) - pv
170+
pv = cumulative(h)
171+
var i = 0
172+
while (i < n) {
173+
a(j) = x(i).asInstanceOf[A]
174+
i += 1
175+
j += 1
176+
}
177+
h += 1
178+
}
179+
var i = 0
180+
while (i < index) {
181+
a(j) = current(i).asInstanceOf[A]
182+
i += 1
183+
j += 1
184+
}
185+
a
186+
}
187+
188+
/** Copies the elements in this `Accumulator` to a `List` */
189+
final def toList: List[A] = {
190+
var ans: List[A] = Nil
191+
var i = index - 1
192+
while (i >= 0) {
193+
ans = current(i).asInstanceOf[A] :: ans
194+
i -= 1
195+
}
196+
var h = hIndex - 1
197+
while (h >= 0) {
198+
val a = history(h)
199+
i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1
200+
while (i >= 0) {
201+
ans = a(i).asInstanceOf[A] :: ans
202+
i -= 1
203+
}
204+
h -= 1
205+
}
206+
ans
207+
}
208+
209+
/** Copies the elements in this `Accumulator` to a specified collection.
210+
* Usage example: `acc.to[Vector]`
211+
*/
212+
final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, A, Coll[A]]): Coll[A] = {
213+
if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString)
214+
val b = cbf()
215+
b.sizeHint(totalSize.toInt)
216+
var h = 0
217+
var pv = 0L
218+
while (h < hIndex) {
219+
val x = history(h)
220+
val n = cumulative(h) - pv
221+
pv = cumulative(h)
222+
var i = 0
223+
while (i < n) {
224+
b += x(i).asInstanceOf[A]
225+
i += 1
226+
}
227+
h += 1
228+
}
229+
var i = 0
230+
while (i < index) {
231+
b += current(i).asInstanceOf[A]
232+
i += 1
233+
}
234+
b.result
235+
}
236+
}
237+
object Accumulator {
238+
private val emptyAnyRefArray = new Array[AnyRef](0)
239+
private val emptyAnyRefArrayArray = new Array[Array[AnyRef]](0)
240+
private val emptyLongArray = new Array[Long](0)
241+
242+
/** A `Supplier` of `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */
243+
def supplier[A] = new java.util.function.Supplier[Accumulator[A]]{ def get: Accumulator[A] = new Accumulator[A] }
244+
245+
/** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */
246+
def adder[A] = new java.util.function.BiConsumer[Accumulator[A], A]{ def accept(ac: Accumulator[A], a: A) { ac += a } }
247+
248+
/** A `BiConsumer` that merges `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */
249+
def merger[A] = new java.util.function.BiConsumer[Accumulator[A], Accumulator[A]]{ def accept(a1: Accumulator[A], a2: Accumulator[A]) { a1 drain a2 } }
250+
}
251+
252+
private[java8] class AccumulatorSpliterator[A](private val acc: Accumulator[A]) extends java.util.Spliterator[A] {
253+
import java.util.Spliterator._
254+
255+
private var h = 0
256+
private var i = 0
257+
private var a = if (acc.hIndex > 0) acc.history(0) else acc.current
258+
private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index
259+
private var N = acc.totalSize
260+
261+
private def duplicateSelf(limit: Long = N): AccumulatorSpliterator[A] = {
262+
val ans = new AccumulatorSpliterator(acc)
263+
ans.h = h
264+
ans.i = i
265+
ans.a = a
266+
ans.n = n
267+
ans.N = limit
268+
ans
269+
}
270+
271+
private def loadMore(): Unit = {
272+
h += 1
273+
if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) }
274+
else { a = acc.current; n = acc.index }
275+
i = 0
276+
}
277+
278+
def characteristics = ORDERED | SIZED | SUBSIZED
279+
280+
def estimateSize = N
281+
282+
override def forEachRemaining(f: java.util.function.Consumer[_ >: A]) {
283+
while (N > 0) {
284+
if (i >= n) loadMore()
285+
val i0 = i
286+
if ((n-i) > N) n = i + N.toInt
287+
while (i < n) {
288+
f.accept(a(i).asInstanceOf[A])
289+
i += 1
290+
}
291+
N -= (n - i0)
292+
}
293+
}
294+
295+
def tryAdvance(f: java.util.function.Consumer[_ >: A]): Boolean =
296+
if (N <= 0) false
297+
else {
298+
if (i >= n) loadMore()
299+
f.accept(a(i).asInstanceOf[A])
300+
i += 1
301+
N -= 1
302+
true
303+
}
304+
305+
def trySplit: java.util.Spliterator[A] =
306+
if (N <= 1) null
307+
else {
308+
val half = (N >> 1)
309+
val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i
310+
val R = M + half
311+
val ans = duplicateSelf(half)
312+
if (h < acc.hIndex) {
313+
val w = acc.seekSlot(R)
314+
h = (w >>> 32).toInt
315+
if (h < acc.hIndex) {
316+
a = acc.history(h)
317+
n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0)
318+
}
319+
else {
320+
a = acc.current
321+
n = acc.index
322+
}
323+
i = (w & 0xFFFFFFFFL).toInt
324+
}
325+
else i += half.toInt
326+
N -= half
327+
ans
328+
}
329+
330+
override def toString = s"$h $i ${a.mkString("{",",","}")} $n $N"
331+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package scala.collection.mutable.java8
2+
3+
/** An accumulator that works with Java 8 streams; it accepts elements of type `A`,
4+
* is itself an `AC`. Accumulators can handle more than `Int.MaxValue` elements.
5+
*/
6+
trait AccumulatorLike[@specialized(Double, Int, Long) A, AC] {
7+
private[java8] var index: Int = 0
8+
private[java8] var hIndex: Int = 0
9+
private[java8] var totalSize: Long = 0L
10+
private[java8] def cumulative(i: Int): Long
11+
12+
private[java8] def nextBlockSize: Int = {
13+
if (totalSize < 32) 16
14+
else if (totalSize <= Int.MaxValue) {
15+
val bit = (64 - java.lang.Long.numberOfLeadingZeros(totalSize))
16+
1 << (bit - (bit >> 2))
17+
}
18+
else 1 << 24
19+
}
20+
21+
/** Size of the accumulated collection, as a `Long` */
22+
final def size = totalSize
23+
24+
/** Remove all accumulated elements from this accumulator. */
25+
def clear(): Unit = {
26+
index = 0
27+
hIndex = 0
28+
totalSize = 0L
29+
}
30+
31+
private[java8] def seekSlot(ix: Long): Long = {
32+
var lo = -1
33+
var hi = hIndex
34+
while (lo + 1 < hi) {
35+
val m = (lo + hi) >>> 1 // Shift allows division-as-unsigned, prevents overflow
36+
if (cumulative(m) > ix) hi = m
37+
else lo = m
38+
}
39+
(hi.toLong << 32) | (if (hi==0) ix else (ix - cumulative(hi-1))).toInt
40+
}
41+
}

0 commit comments

Comments
 (0)