From d0a3ab1be1eb0700351a1ef3a7d03a050611a863 Mon Sep 17 00:00:00 2001 From: Rex Kerr Date: Mon, 13 Jul 2015 16:13:56 -0700 Subject: [PATCH] Basic interoperability between Java8 Streams and Scala collections. This consists of (1) toScala[Coll] extension methods for Java8 Streams (2) seqStream and parStream extension methods for Scala collections (3) A manually specialized set of Accumulators that let you quickly save a stream for multiple re-use. (a) accumulate and accumulatePrimitive methods on Java8 Streams (b) to[Coll] on Accumulators (c) .iterator, .toArray, .toList also There is a lot of redundant code in the Accumulators since it is difficult to pull out common functionality without endangering performance of the manually specialized cases. Tests written. Scaladocs written for the new classes. No top-level docs; feature is not complete. --- .../scala/compat/java8/StreamConverters.scala | 226 ++++++++++++ .../java8/collectionCompat/Accumulator.scala | 331 +++++++++++++++++ .../collectionCompat/AccumulatorLike.scala | 41 +++ .../collectionCompat/DoubleAccumulator.scala | 327 +++++++++++++++++ .../collectionCompat/IntAccumulator.scala | 333 ++++++++++++++++++ .../collectionCompat/LongAccumulator.scala | 327 +++++++++++++++++ .../compat/java8/StreamConvertersTest.scala | 145 ++++++++ 7 files changed, 1730 insertions(+) create mode 100644 src/main/scala/scala/compat/java8/StreamConverters.scala create mode 100644 src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala create mode 100644 src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala create mode 100644 src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala create mode 100644 src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala create mode 100644 src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala create mode 100644 src/test/scala/scala/compat/java8/StreamConvertersTest.scala diff --git a/src/main/scala/scala/compat/java8/StreamConverters.scala b/src/main/scala/scala/compat/java8/StreamConverters.scala new file mode 100644 index 0000000..1856d71 --- /dev/null +++ b/src/main/scala/scala/compat/java8/StreamConverters.scala @@ -0,0 +1,226 @@ +package scala.compat.java8 + +import language.implicitConversions + +import java.util.stream._ +import scala.compat.java8.collectionCompat._ + +trait PrimitiveStreamAccumulator[S, AA] { + def streamAccumulate(stream: S): AA +} + +trait PrimitiveStreamUnboxer[A, S] { + def apply(boxed: Stream[A]): S +} + +trait Priority2StreamConverters { + implicit class EnrichAnyScalaCollectionWithStream[A](t: TraversableOnce[A]) { + private def mkAcc() = { + val acc = new Accumulator[A] + t.foreach{ acc += _ } + acc + } + + def seqStream: Stream[A] = mkAcc().seqStream + + def parStream: Stream[A] = mkAcc().parStream + } + + implicit class EnrichMissingPrimitiveArrayWithStream[A](a: Array[A]) { + private def mkAcc() = { + val acc = new Accumulator[A] + var i = 0 + while (i < a.length) { + acc += a(i) + i += 1 + } + acc + } + + def seqStream: Stream[A] = mkAcc().seqStream + + def parStream: Stream[A] = mkAcc().parStream + } +} + +trait Priority1StreamConverters extends Priority2StreamConverters { + implicit class EnrichScalaCollectionWithStream[A <: AnyRef](t: TraversableOnce[A]) { + private def mkArr()(implicit tag: reflect.ClassTag[A]): Array[A] = { + if (t.isTraversableAgain && t.hasDefiniteSize) { + val sz = t.size + val a = new Array[A](sz) + t.copyToArray(a, 0, sz) + a + } + else t.toArray[A] + } + + def seqStream(implicit tag: reflect.ClassTag[A]): Stream[A] = + java.util.Arrays.stream(mkArr()) + + def parStream(implicit tag: reflect.ClassTag[A]): Stream[A] = seqStream.parallel + } + + implicit class EnrichGenericArrayWithStream[A <: AnyRef](a: Array[A]) { + def seqStream: Stream[A] = java.util.Arrays.stream(a) + def parStream: Stream[A] = seqStream.parallel + } + + implicit class RichStream[A](stream: Stream[A]) { + def accumulate = stream.collect(Accumulator.supplier[A], Accumulator.adder[A], Accumulator.merger[A]) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, A, Coll[A]]): Coll[A] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.Consumer[A]{ def accept(a: A) { b += a } }) + b.result() + } + } + + def unboxed[S](implicit ubx: PrimitiveStreamUnboxer[A, S]): S = ubx(stream) + } + + implicit class RichStreamCanAccumulatePrimitive[S](stream: S) { + def accumulatePrimitive[AA](implicit psa: PrimitiveStreamAccumulator[S, AA]) = psa.streamAccumulate(stream) + } +} + +/** `StreamConverters` provides extension methods and other functionality to + * ease interoperability of Scala collections with `java.util.stream` classes. + * + * Scala collections gain extension methods `seqStream` and + * `parStream` that allow them to be used as the source of a `Stream`. + * + * `Array` also gains `seqStream` and `parStream` methods, and calling those + * on `Array[Double]`, `Array[Int]`, or `Array[Long]` will produce the + * corresponding primitive stream. + * + * Streams gain `accumulate` and `toScala[_]` methods, which collect the stream + * into a custom high-performance `scala.collection.mutable.java8.Accumulator`, + * which is not part of the standard collections hierarchy, or into a named + * Scala collection, respectively. + * + * Generic streams also gain an `unboxed` method that will convert to the + * corresponding unboxed primitive stream, if appropriate. Unboxed streams + * have custom accumulators with improved performance. + * + * Accumulators have `toArray`, `toList`, `iterator`, and `to[_]` methods + * to convert to standard Scala collections. + * + * Example: + * ``` + * import scala.compat.java8.StreamConverers._ + * + * val s = Vector(1,2,3,4).parStream // Stream[Int] + * val si = s.unboxed // Stream.OfInt + * val ai = si.accumulate // IntAccumulator + * val v = ai.to[Vector] // Vector[Int] again + * + * val t = Array(2.0, 3.0, 4.0).parStream // DoubleStream + * val q = t.toScala[scala.collection.immutable.Queue] // Queue[Double] + * ``` + */ +object StreamConverters extends Priority1StreamConverters { + implicit class EnrichDoubleArrayWithStream(a: Array[Double]) { + def seqStream: DoubleStream = java.util.Arrays.stream(a) + def parStream: DoubleStream = seqStream.parallel + } + + implicit class EnrichIntArrayWithStream(a: Array[Int]) { + def seqStream: IntStream = java.util.Arrays.stream(a) + def parStream: IntStream = seqStream.parallel + } + + implicit class EnrichLongArrayWithStream(a: Array[Long]) { + def seqStream: LongStream = java.util.Arrays.stream(a) + def parStream: LongStream = seqStream.parallel + } + + implicit val primitiveAccumulateDoubleStream = new PrimitiveStreamAccumulator[Stream[Double], DoubleAccumulator] { + def streamAccumulate(stream: Stream[Double]): DoubleAccumulator = + stream.collect(DoubleAccumulator.supplier, DoubleAccumulator.boxedAdder, DoubleAccumulator.merger) + } + + implicit val primitiveAccumulateDoubleStream2 = + primitiveAccumulateDoubleStream.asInstanceOf[PrimitiveStreamAccumulator[Stream[java.lang.Double], DoubleAccumulator]] + + implicit val primitiveUnboxDoubleStream = new PrimitiveStreamUnboxer[Double, DoubleStream] { + def apply(boxed: Stream[Double]): DoubleStream = + boxed.mapToDouble(new java.util.function.ToDoubleFunction[Double]{ def applyAsDouble(d: Double) = d }) + } + + implicit val primitiveUnboxDoubleStream2 = + primitiveUnboxDoubleStream.asInstanceOf[PrimitiveStreamUnboxer[java.lang.Double, DoubleStream]] + + implicit val primitiveAccumulateIntStream = new PrimitiveStreamAccumulator[Stream[Int], IntAccumulator] { + def streamAccumulate(stream: Stream[Int]): IntAccumulator = + stream.collect(IntAccumulator.supplier, IntAccumulator.boxedAdder, IntAccumulator.merger) + } + + implicit val primitiveAccumulateIntStream2 = + primitiveAccumulateIntStream.asInstanceOf[PrimitiveStreamAccumulator[Stream[java.lang.Integer], IntAccumulator]] + + implicit val primitiveUnboxIntStream = new PrimitiveStreamUnboxer[Int, IntStream] { + def apply(boxed: Stream[Int]): IntStream = + boxed.mapToInt(new java.util.function.ToIntFunction[Int]{ def applyAsInt(d: Int) = d }) + } + + implicit val primitiveUnboxIntStream2 = + primitiveUnboxIntStream.asInstanceOf[PrimitiveStreamUnboxer[java.lang.Integer, IntStream]] + + implicit val primitiveAccumulateLongStream = new PrimitiveStreamAccumulator[Stream[Long], LongAccumulator] { + def streamAccumulate(stream: Stream[Long]): LongAccumulator = + stream.collect(LongAccumulator.supplier, LongAccumulator.boxedAdder, LongAccumulator.merger) + } + + implicit val primitiveAccumulateLongStream2 = + primitiveAccumulateLongStream.asInstanceOf[PrimitiveStreamAccumulator[Stream[java.lang.Long], LongAccumulator]] + + implicit val primitiveUnboxLongStream = new PrimitiveStreamUnboxer[Long, LongStream] { + def apply(boxed: Stream[Long]): LongStream = + boxed.mapToLong(new java.util.function.ToLongFunction[Long]{ def applyAsLong(d: Long) = d }) + } + + implicit val primitiveUnboxLongStream2 = + primitiveUnboxLongStream.asInstanceOf[PrimitiveStreamUnboxer[java.lang.Long, LongStream]] + + implicit class RichDoubleStream(stream: DoubleStream) { + def accumulate = stream.collect(DoubleAccumulator.supplier, DoubleAccumulator.adder, DoubleAccumulator.merger) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Double, Coll[Double]]): Coll[Double] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.DoubleConsumer{ def accept(d: Double) { b += d } }) + b.result() + } + } + } + + implicit class RichIntStream(stream: IntStream) { + def accumulate = stream.collect(IntAccumulator.supplier, IntAccumulator.adder, IntAccumulator.merger) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Int, Coll[Int]]): Coll[Int] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.IntConsumer{ def accept(d: Int) { b += d } }) + b.result() + } + } + } + + implicit class RichLongStream(stream: LongStream) { + def accumulate = stream.collect(LongAccumulator.supplier, LongAccumulator.adder, LongAccumulator.merger) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Long, Coll[Long]]): Coll[Long] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.LongConsumer{ def accept(d: Long) { b += d } }) + b.result() + } + } + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala new file mode 100644 index 0000000..de93806 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala @@ -0,0 +1,331 @@ +package scala.compat.java8.collectionCompat + +/** An `Accumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging Accumulators. + * Accumulators can contain more than `Int.MaxValue` elements. + */ +final class Accumulator[A] +extends AccumulatorLike[A, Accumulator[A]] +{ self => + private[java8] var current: Array[AnyRef] = Accumulator.emptyAnyRefArray + private[java8] var history: Array[Array[AnyRef]] = Accumulator.emptyAnyRefArrayArray + private[java8] var cumul: Array[Long] = Accumulator.emptyLongArray + + private[java8] def cumulative(i: Int) = cumul(i) + + private def expand(): Unit = { + if (index > 0) { + if (hIndex >= history.length) hExpand() + history(hIndex) = current + cumul(hIndex) = (if (hIndex > 0) cumulative(hIndex-1) else 0) + index + hIndex += 1 + } + current = new Array[AnyRef](nextBlockSize) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) { + history = new Array[Array[AnyRef]](4) + cumul = new Array[Long](4) + } + else { + history = java.util.Arrays.copyOf(history, history.length << 1) + cumul = java.util.Arrays.copyOf(cumul, cumul.length << 1) + } + } + + /** Appends an element to this `Accumulator`. */ + final def +=(a: A): Unit = { + totalSize += 1 + if (index >= current.length) expand() + current(index) = a.asInstanceOf[AnyRef] + index += 1 + } + + /** Removes all elements from `that` and appends them to this `Accumulator`. */ + final def drain[A1 <: A](that: Accumulator[A1]): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val n = (that.cumulative(h) - prev).toInt + if (current.length - index >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = that.cumulative(h) + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index >= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + cumul = java.util.Arrays.copyOf(cumul, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + pv += index + cumul(hIndex) = pv + history(hIndex) = (if (index < (current.length >>> 3) && current.length > 32) java.util.Arrays.copyOf(current, index) else current) + hIndex += 1 + } + while (h < that.hIndex) { + pv += that.cumulative(h) - prev + prev = that.cumulative(h) + cumul(hIndex) = pv + history(hIndex) = that.history(h) + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = Accumulator.emptyAnyRefArray + history = Accumulator.emptyAnyRefArrayArray + cumul = Accumulator.emptyLongArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): A = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt).asInstanceOf[A] + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt).asInstanceOf[A] + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): A = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `Accumulator`. */ + final def iterator = new Iterator[A] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[AnyRef] = Accumulator.emptyAnyRefArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans.asInstanceOf[A] + } + } + + /** Returns a `java.util.Spliterator` over the contents of this `Accumulator`*/ + final def spliterator: java.util.Spliterator[A] = new AccumulatorSpliterator[A](this) + + /** Produces a sequential Java 8 Stream over the elements of this `Accumulator`*/ + final def seqStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(spliterator, false) + + /** Produces a parallel Java 8 Stream over the elements of this `Accumulator`*/ + final def parStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(spliterator, true) + + /** Copies the elements in this `Accumulator` into an `Array` */ + final def toArray(implicit tag: reflect.ClassTag[A]) = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[A](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + a(j) = x(i).asInstanceOf[A] + i += 1 + j += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + a(j) = current(i).asInstanceOf[A] + i += 1 + j += 1 + } + a + } + + /** Copies the elements in this `Accumulator` to a `List` */ + final def toList: List[A] = { + var ans: List[A] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i).asInstanceOf[A] :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i).asInstanceOf[A] :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `Accumulator` to a specified collection. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, A, Coll[A]]): Coll[A] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + b += x(i).asInstanceOf[A] + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i).asInstanceOf[A] + i += 1 + } + b.result + } +} +object Accumulator { + private val emptyAnyRefArray = new Array[AnyRef](0) + private val emptyAnyRefArrayArray = new Array[Array[AnyRef]](0) + private val emptyLongArray = new Array[Long](0) + + /** A `Supplier` of `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def supplier[A] = new java.util.function.Supplier[Accumulator[A]]{ def get: Accumulator[A] = new Accumulator[A] } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def adder[A] = new java.util.function.BiConsumer[Accumulator[A], A]{ def accept(ac: Accumulator[A], a: A) { ac += a } } + + /** A `BiConsumer` that merges `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def merger[A] = new java.util.function.BiConsumer[Accumulator[A], Accumulator[A]]{ def accept(a1: Accumulator[A], a2: Accumulator[A]) { a1 drain a2 } } +} + +private[java8] class AccumulatorSpliterator[A](private val acc: Accumulator[A]) extends java.util.Spliterator[A] { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): AccumulatorSpliterator[A] = { + val ans = new AccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.Consumer[_ >: A]) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i).asInstanceOf[A]) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.Consumer[_ >: A]): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i).asInstanceOf[A]) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator[A] = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } + + override def toString = s"$h $i ${a.mkString("{",",","}")} $n $N" +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala b/src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala new file mode 100644 index 0000000..75de367 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala @@ -0,0 +1,41 @@ +package scala.compat.java8.collectionCompat + +/** An accumulator that works with Java 8 streams; it accepts elements of type `A`, + * is itself an `AC`. Accumulators can handle more than `Int.MaxValue` elements. + */ +trait AccumulatorLike[@specialized(Double, Int, Long) A, AC] { + private[java8] var index: Int = 0 + private[java8] var hIndex: Int = 0 + private[java8] var totalSize: Long = 0L + private[java8] def cumulative(i: Int): Long + + private[java8] def nextBlockSize: Int = { + if (totalSize < 32) 16 + else if (totalSize <= Int.MaxValue) { + val bit = (64 - java.lang.Long.numberOfLeadingZeros(totalSize)) + 1 << (bit - (bit >> 2)) + } + else 1 << 24 + } + + /** Size of the accumulated collection, as a `Long` */ + final def size = totalSize + + /** Remove all accumulated elements from this accumulator. */ + def clear(): Unit = { + index = 0 + hIndex = 0 + totalSize = 0L + } + + private[java8] def seekSlot(ix: Long): Long = { + var lo = -1 + var hi = hIndex + while (lo + 1 < hi) { + val m = (lo + hi) >>> 1 // Shift allows division-as-unsigned, prevents overflow + if (cumulative(m) > ix) hi = m + else lo = m + } + (hi.toLong << 32) | (if (hi==0) ix else (ix - cumulative(hi-1))).toInt + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala new file mode 100644 index 0000000..50b9d47 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala @@ -0,0 +1,327 @@ +package scala.compat.java8.collectionCompat + +/** A `DoubleAccumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging them. + * This is a manually specialized variant of `Accumulator` with no actual + * subclassing relationship with `Accumulator`. + */ +final class DoubleAccumulator +extends AccumulatorLike[Double, DoubleAccumulator] +{ self => + private[java8] var current: Array[Double] = DoubleAccumulator.emptyDoubleArray + private[java8] var history: Array[Array[Double]] = DoubleAccumulator.emptyDoubleArrayArray + + private[java8] def cumulative(i: Int) = { val x = history(i); x(x.length-1).toLong } + + private def expand(): Unit = { + if (index > 0) { + current(current.length-1) = (if (hIndex > 0) { val x = history(hIndex-1); x(x.length-1) } else 0) + index + if (hIndex >= history.length) hExpand() + history(hIndex) = current + hIndex += 1 + } + current = new Array[Double](nextBlockSize+1) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) history = new Array[Array[Double]](4) + else history = java.util.Arrays.copyOf(history, history.length << 1) + } + + /** Appends an element to this `DoubleAccumulator`. */ + final def +=(a: Double): Unit = { + totalSize += 1 + if (index+1 >= current.length) expand() + current(index) = a + index += 1 + } + + /** Removes all elements from `that` and appends them to this `DoubleAccumulator`. */ + final def drain(that: DoubleAccumulator): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val cuml = that.cumulative(h) + val n = (cuml - prev).toInt + if (current.length - index - 1 >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = cuml + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index - 1>= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + val x = + if (index < (current.length >>> 3) && current.length - 1 > 32) { + val ans = java.util.Arrays.copyOf(current, index + 1) + ans(ans.length - 1) = current(current.length - 1) + ans + } + else current + pv = pv + index + x(x.length - 1) = pv + history(hIndex) = x + hIndex += 1 + } + while (h < that.hIndex) { + val cuml = that.cumulative(h) + pv = pv + cuml - prev + prev = cuml + val x = that.history(h) + x(x.length - 1) = pv + history(hIndex) = x + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = DoubleAccumulator.emptyDoubleArray + history = DoubleAccumulator.emptyDoubleArrayArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): Double = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt) + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt) + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): Double = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `DoubleAccumulator`. The `Iterator` is not specialized. */ + final def iterator = new Iterator[Double] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[Double] = DoubleAccumulator.emptyDoubleArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans + } + } + + /** Returns a `java.util.Spliterator.OfDouble` over the contents of this `DoubleAccumulator`*/ + final def spliterator: java.util.Spliterator.OfDouble = new DoubleAccumulatorSpliterator(this) + + /** Produces a sequential Java 8 `DoubleStream` over the elements of this `DoubleAccumulator`*/ + final def seqStream: java.util.stream.DoubleStream = java.util.stream.StreamSupport.doubleStream(spliterator, false) + + /** Produces a parallel Java 8 `DoubleStream` over the elements of this `DoubleAccumulator`*/ + final def parStream: java.util.stream.DoubleStream = java.util.stream.StreamSupport.doubleStream(spliterator, true) + + /** Copies the elements in this `DoubleAccumulator` into an `Array[Double]` */ + final def toArray = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[Double](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = x(x.length-1).toLong + val n = (cuml - pv).toInt + pv = cuml + System.arraycopy(x, 0, a, j, n) + j += n + h += 1 + } + System.arraycopy(current, 0, a, j, index) + j += index + a + } + + /** Copies the elements in this `DoubleAccumulator` to a `List` */ + final def toList: List[Double] = { + var ans: List[Double] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i) :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i) :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `DoubleAccumulator` to a specified collection. + * Note that the target collection is not specialized. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Double, Coll[Double]]): Coll[Double] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + b += x(i) + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i) + i += 1 + } + b.result + } +} +object DoubleAccumulator { + private val emptyDoubleArray = new Array[Double](0) + private val emptyDoubleArrayArray = new Array[Array[Double]](0) + + /** A `Supplier` of `DoubleAccumulator`s, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. Suitable for `Stream[Double]` also. */ + def supplier = new java.util.function.Supplier[DoubleAccumulator]{ def get: DoubleAccumulator = new DoubleAccumulator } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. */ + def adder = new java.util.function.ObjDoubleConsumer[DoubleAccumulator]{ def accept(ac: DoubleAccumulator, a: Double) { ac += a } } + + /** A `BiConsumer` that adds a boxed `Double` to an `DoubleAccumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def boxedAdder = new java.util.function.BiConsumer[DoubleAccumulator, Double]{ def accept(ac: DoubleAccumulator, a: Double) { ac += a } } + + /** A `BiConsumer` that merges `DoubleAccumulator`s, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. Suitable for `Stream[Double]` also. */ + def merger = new java.util.function.BiConsumer[DoubleAccumulator, DoubleAccumulator]{ def accept(a1: DoubleAccumulator, a2: DoubleAccumulator) { a1 drain a2 } } +} + +private[java8] class DoubleAccumulatorSpliterator(private val acc: DoubleAccumulator) extends java.util.Spliterator.OfDouble { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): DoubleAccumulatorSpliterator = { + val ans = new DoubleAccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.DoubleConsumer) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i)) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.DoubleConsumer): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i)) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator.OfDouble = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala new file mode 100644 index 0000000..ea658a3 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala @@ -0,0 +1,333 @@ +package scala.compat.java8.collectionCompat + +/** A `IntAccumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging them. + * This is a manually specialized variant of `Accumulator` with no actual + * subclassing relationship with `Accumulator`. + */ +final class IntAccumulator +extends AccumulatorLike[Int, IntAccumulator] +{ self => + private[java8] var current: Array[Int] = IntAccumulator.emptyIntArray + private[java8] var history: Array[Array[Int]] = IntAccumulator.emptyIntArrayArray + + private[java8] def cumulative(i: Int) = { val x = history(i); x(x.length-2).toLong << 32 | (x(x.length-1)&0xFFFFFFFFL) } + + private def expand(): Unit = { + if (index > 0) { + val cuml = (if (hIndex > 0) cumulative(hIndex-1) else 0) + index + current(current.length-2) = (cuml >>> 32).toInt + current(current.length-1) = (cuml & 0xFFFFFFFFL).toInt + if (hIndex >= history.length) hExpand() + history(hIndex) = current + hIndex += 1 + } + current = new Array[Int](nextBlockSize+1) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) history = new Array[Array[Int]](4) + else history = java.util.Arrays.copyOf(history, history.length << 1) + } + + /** Appends an element to this `IntAccumulator`. */ + final def +=(a: Int): Unit = { + totalSize += 1 + if (index+2 >= current.length) expand() + current(index) = a + index += 1 + } + + /** Removes all elements from `that` and appends them to this `IntAccumulator`. */ + final def drain(that: IntAccumulator): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val cuml = that.cumulative(h) + val n = (cuml - prev).toInt + if (current.length - index - 2 >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = cuml + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index - 2 >= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + val x = + if (index < (current.length >>> 3) && current.length - 1 > 32) { + val ans = java.util.Arrays.copyOf(current, index + 2) + ans(ans.length - 2) = current(current.length - 2) + ans(ans.length - 1) = current(current.length - 1) + ans + } + else current + pv = pv + index + x(x.length - 2) = (pv >>> 32).toInt + x(x.length - 1) = (pv & 0xFFFFFFFFL).toInt + history(hIndex) = x + hIndex += 1 + } + while (h < that.hIndex) { + val cuml = that.cumulative(h) + pv = pv + cuml - prev + prev = cuml + val x = that.history(h) + x(x.length - 2) = (pv >>> 32).toInt + x(x.length - 1) = (pv & 0xFFFFFFFFL).toInt + history(hIndex) = x + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = IntAccumulator.emptyIntArray + history = IntAccumulator.emptyIntArrayArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): Int = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt) + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt) + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): Int = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `IntAccumulator`. The `Iterator` is not specialized. */ + final def iterator = new Iterator[Int] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[Int] = IntAccumulator.emptyIntArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans + } + } + + /** Returns a `java.util.Spliterator.OfInt` over the contents of this `IntAccumulator`*/ + final def spliterator: java.util.Spliterator.OfInt = new IntAccumulatorSpliterator(this) + + /** Produces a sequential Java 8 `IntStream` over the elements of this `IntAccumulator`*/ + final def seqStream: java.util.stream.IntStream = java.util.stream.StreamSupport.intStream(spliterator, false) + + /** Produces a parallel Java 8 `IntStream` over the elements of this `IntAccumulator`*/ + final def parStream: java.util.stream.IntStream = java.util.stream.StreamSupport.intStream(spliterator, true) + + /** Copies the elements in this `IntAccumulator` into an `Array[Int]` */ + final def toArray = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[Int](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = cumulative(h) + val n = (cuml - pv).toInt + pv = cuml + System.arraycopy(x, 0, a, j, n) + j += n + h += 1 + } + System.arraycopy(current, 0, a, j, index) + j += index + a + } + + /** Copies the elements in this `IntAccumulator` to a `List` */ + final def toList: List[Int] = { + var ans: List[Int] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i) :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i) :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `IntAccumulator` to a specified collection. + * Note that the target collection is not specialized. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Int, Coll[Int]]): Coll[Int] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = cumulative(h) + val n = cuml - pv + pv = cuml + var i = 0 + while (i < n) { + b += x(i) + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i) + i += 1 + } + b.result + } +} +object IntAccumulator { + private val emptyIntArray = new Array[Int](0) + private val emptyIntArrayArray = new Array[Array[Int]](0) + + /** A `Supplier` of `IntAccumulator`s, suitable for use with `java.util.stream.IntStream`'s `collect` method. Suitable for `Stream[Int]` also. */ + def supplier = new java.util.function.Supplier[IntAccumulator]{ def get: IntAccumulator = new IntAccumulator } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.IntStream`'s `collect` method. */ + def adder = new java.util.function.ObjIntConsumer[IntAccumulator]{ def accept(ac: IntAccumulator, a: Int) { ac += a } } + + /** A `BiConsumer` that adds a boxed `Int` to an `IntAccumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def boxedAdder = new java.util.function.BiConsumer[IntAccumulator, Int]{ def accept(ac: IntAccumulator, a: Int) { ac += a } } + + /** A `BiConsumer` that merges `IntAccumulator`s, suitable for use with `java.util.stream.IntStream`'s `collect` method. Suitable for `Stream[Int]` also. */ + def merger = new java.util.function.BiConsumer[IntAccumulator, IntAccumulator]{ def accept(a1: IntAccumulator, a2: IntAccumulator) { a1 drain a2 } } +} + +private[java8] class IntAccumulatorSpliterator(private val acc: IntAccumulator) extends java.util.Spliterator.OfInt { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): IntAccumulatorSpliterator = { + val ans = new IntAccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.IntConsumer) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i)) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.IntConsumer): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i)) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator.OfInt = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala new file mode 100644 index 0000000..f976b08 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala @@ -0,0 +1,327 @@ +package scala.compat.java8.collectionCompat + +/** A `LongAccumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging them. + * This is a manually specialized variant of `Accumulator` with no actual + * subclassing relationship with `Accumulator`. + */ +final class LongAccumulator +extends AccumulatorLike[Long, LongAccumulator] +{ self => + private[java8] var current: Array[Long] = LongAccumulator.emptyLongArray + private[java8] var history: Array[Array[Long]] = LongAccumulator.emptyLongArrayArray + + private[java8] def cumulative(i: Int) = { val x = history(i); x(x.length-1) } + + private def expand(): Unit = { + if (index > 0) { + current(current.length-1) = (if (hIndex > 0) { val x = history(hIndex-1); x(x.length-1) } else 0) + index + if (hIndex >= history.length) hExpand() + history(hIndex) = current + hIndex += 1 + } + current = new Array[Long](nextBlockSize+1) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) history = new Array[Array[Long]](4) + else history = java.util.Arrays.copyOf(history, history.length << 1) + } + + /** Appends an element to this `LongAccumulator`. */ + final def +=(a: Long): Unit = { + totalSize += 1 + if (index+1 >= current.length) expand() + current(index) = a + index += 1 + } + + /** Removes all elements from `that` and appends them to this `LongAccumulator`. */ + final def drain(that: LongAccumulator): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val cuml = that.cumulative(h) + val n = (cuml - prev).toInt + if (current.length - index - 1 >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = cuml + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index - 1>= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + val x = + if (index < (current.length >>> 3) && current.length - 1 > 32) { + val ans = java.util.Arrays.copyOf(current, index + 1) + ans(ans.length - 1) = current(current.length - 1) + ans + } + else current + pv = pv + index + x(x.length - 1) = pv + history(hIndex) = x + hIndex += 1 + } + while (h < that.hIndex) { + val cuml = that.cumulative(h) + pv = pv + cuml - prev + prev = cuml + val x = that.history(h) + x(x.length - 1) = pv + history(hIndex) = x + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = LongAccumulator.emptyLongArray + history = LongAccumulator.emptyLongArrayArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): Long = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt) + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt) + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): Long = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `LongAccumulator`. The `Iterator` is not specialized. */ + final def iterator = new Iterator[Long] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[Long] = LongAccumulator.emptyLongArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans + } + } + + /** Returns a `java.util.Spliterator.OfLong` over the contents of this `LongAccumulator`*/ + final def spliterator: java.util.Spliterator.OfLong = new LongAccumulatorSpliterator(this) + + /** Produces a sequential Java 8 `LongStream` over the elements of this `LongAccumulator`*/ + final def seqStream: java.util.stream.LongStream = java.util.stream.StreamSupport.longStream(spliterator, false) + + /** Produces a parallel Java 8 `LongStream` over the elements of this `LongAccumulator`*/ + final def parStream: java.util.stream.LongStream = java.util.stream.StreamSupport.longStream(spliterator, true) + + /** Copies the elements in this `LongAccumulator` into an `Array[Long]` */ + final def toArray = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[Long](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = x(x.length-1) + val n = (cuml - pv).toInt + pv = cuml + System.arraycopy(x, 0, a, j, n) + j += n + h += 1 + } + System.arraycopy(current, 0, a, j, index) + j += index + a + } + + /** Copies the elements in this `LongAccumulator` to a `List` */ + final def toList: List[Long] = { + var ans: List[Long] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i) :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i) :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `LongAccumulator` to a specified collection. + * Note that the target collection is not specialized. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Long, Coll[Long]]): Coll[Long] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + b += x(i) + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i) + i += 1 + } + b.result + } +} +object LongAccumulator { + private val emptyLongArray = new Array[Long](0) + private val emptyLongArrayArray = new Array[Array[Long]](0) + + /** A `Supplier` of `LongAccumulator`s, suitable for use with `java.util.stream.LongStream`'s `collect` method. Suitable for `Stream[Long]` also. */ + def supplier = new java.util.function.Supplier[LongAccumulator]{ def get: LongAccumulator = new LongAccumulator } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.LongStream`'s `collect` method. */ + def adder = new java.util.function.ObjLongConsumer[LongAccumulator]{ def accept(ac: LongAccumulator, a: Long) { ac += a } } + + /** A `BiConsumer` that adds a boxed `Long` to an `LongAccumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def boxedAdder = new java.util.function.BiConsumer[LongAccumulator, Long]{ def accept(ac: LongAccumulator, a: Long) { ac += a } } + + /** A `BiConsumer` that merges `LongAccumulator`s, suitable for use with `java.util.stream.LongStream`'s `collect` method. Suitable for `Stream[Long]` also. */ + def merger = new java.util.function.BiConsumer[LongAccumulator, LongAccumulator]{ def accept(a1: LongAccumulator, a2: LongAccumulator) { a1 drain a2 } } +} + +private[java8] class LongAccumulatorSpliterator(private val acc: LongAccumulator) extends java.util.Spliterator.OfLong { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): LongAccumulatorSpliterator = { + val ans = new LongAccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.LongConsumer) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i)) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.LongConsumer): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i)) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator.OfLong = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } +} diff --git a/src/test/scala/scala/compat/java8/StreamConvertersTest.scala b/src/test/scala/scala/compat/java8/StreamConvertersTest.scala new file mode 100644 index 0000000..17d65eb --- /dev/null +++ b/src/test/scala/scala/compat/java8/StreamConvertersTest.scala @@ -0,0 +1,145 @@ +package scala.compat.java8 + +import org.junit.Test +import org.junit.Assert._ + +class StreamConvertersTest { + import java.util.stream._ + import StreamConverters._ + + def assertEq[A](a1: A, a2: A, s: String) { assertEquals(s, a1, a2) } // Weird order normally! + def assertEq[A](a1: A, a2: A) { assertEq(a1, a2, "not equal") } + + def arrayO(n: Int) = (1 to n).map(_.toString).toArray + def arrayD(n: Int) = (1 to n).map(_.toDouble).toArray + def arrayI(n: Int) = (1 to n).toArray + def arrayL(n: Int) = (1 to n).map(_.toLong).toArray + + def newStream(n: Int) = java.util.Arrays.stream(arrayO(n)) + def newDoubleStream(n: Int) = java.util.Arrays.stream(arrayD(n)) + def newIntStream(n: Int) = java.util.Arrays.stream(arrayI(n)) + def newLongStream(n: Int) = java.util.Arrays.stream(arrayL(n)) + + val ns = Vector(0, 1, 2, 12, 15, 16, 17, 31, 32, 33, 151, 1298, 7159) + + @Test + def streamAccumulate() { + for (n <- ns) { + val vecO = arrayO(n).toVector + val accO = newStream(n).parallel.accumulate + assertEq(vecO, newStream(n).accumulate.to[Vector], s"stream $n to vector") + assertEq(vecO, accO.to[Vector], s"stream $n to vector in parallel") + assertEq(vecO, accO.toArray.toVector, s"stream $n to vector via array in parallel") + assertEq(vecO, accO.iterator.toVector, s"stream $n to vector via iterator in parallel") + assertEq(vecO, accO.toList.toVector, s"stream $n to vector via list in parallel") + assert((0 until accO.size.toInt).forall(i => vecO(i) == accO(i)), s"stream $n indexed via accumulator") + assert(accO.isInstanceOf[scala.compat.java8.collectionCompat.Accumulator[_]], s"stream $n to generic accumulator") + + for (boxless <- Seq(false, true)) { + val sbox = (if (boxless) "" else "(boxed)") + val vecD = arrayD(n).toVector + val accD = + if (boxless) newDoubleStream(n).parallel.accumulate + else newDoubleStream(n).boxed.parallel.accumulatePrimitive + assertEq(vecD, newDoubleStream(n).accumulate.to[Vector], s"double stream $n to vector $sbox") + assertEq(vecD, accD.to[Vector], s"double stream $n to vector in parallel $sbox") + assertEq(vecD, accD.toArray.toVector, s"double stream $n to vector via array in parallel $sbox") + assertEq(vecD, accD.iterator.toVector, s"double stream $n to vector via iterator in parallel $sbox") + assertEq(vecD, accD.toList.toVector, s"double stream $n to vector via list in parallel $sbox") + assert((0 until accD.size.toInt).forall(i => vecD(i) == accD(i)), s"double stream $n indexed via accumulator $sbox") + assert(accD.isInstanceOf[scala.compat.java8.collectionCompat.DoubleAccumulator], s"double stream $n to generic accumulator $sbox") + + val vecI = arrayI(n).toVector + val accI = + if (boxless) newIntStream(n).parallel.accumulate + else newIntStream(n).boxed.parallel.accumulatePrimitive + assertEq(vecI, newIntStream(n).accumulate.to[Vector], s"int stream $n to vector $sbox") + assertEq(vecI, accI.to[Vector], s"int stream $n to vector in parallel $sbox") + assertEq(vecI, accI.toArray.toVector, s"int stream $n to vector via array in parallel $sbox") + assertEq(vecI, accI.iterator.toVector, s"int stream $n to vector via iterator in parallel $sbox") + assertEq(vecI, accI.toList.toVector, s"int stream $n to vector via list in parallel $sbox") + assert((0 until accI.size.toInt).forall(i => vecI(i) == accI(i)), s"int stream $n indexed via accumulator $sbox") + assert(accI.isInstanceOf[scala.compat.java8.collectionCompat.IntAccumulator], s"int stream $n to generic accumulator $sbox") + + val vecL = arrayL(n).toVector + val accL = + if (boxless) newLongStream(n).parallel.accumulate + else newLongStream(n).boxed.parallel.accumulatePrimitive + assertEq(vecL, newLongStream(n).accumulate.to[Vector], s"long stream $n to vector $sbox") + assertEq(vecL, accL.to[Vector], s"long stream $n to vector in parallel $sbox") + assertEq(vecL, accL.toArray.toVector, s"long stream $n to vector via array in parallel $sbox") + assertEq(vecL, accL.iterator.toVector, s"long stream $n to vector via iterator in parallel $sbox") + assertEq(vecL, accL.toList.toVector, s"long stream $n to vector via list in parallel $sbox") + assert((0 until accL.size.toInt).forall(i => vecL(i) == accL(i)), s"long stream $n indexed via accumulator $sbox") + assert(accL.isInstanceOf[scala.compat.java8.collectionCompat.LongAccumulator], s"long stream $n to generic accumulator $sbox") + } + } + } + + @Test + def streamToScala() { + for (n <- ns) { + val vecO = arrayO(n).toVector + assertEq(vecO, newStream(n).toScala[Vector]) + assertEq(vecO, newStream(n).parallel.toScala[Vector]) + + val vecD = arrayD(n).toVector + assertEq(vecD, newDoubleStream(n).toScala[Vector]) + assertEq(vecD, newDoubleStream(n).parallel.toScala[Vector]) + + val vecI = arrayI(n).toVector + assertEq(vecI, newIntStream(n).toScala[Vector]) + assertEq(vecI, newIntStream(n).parallel.toScala[Vector]) + + val vecL = arrayL(n).toVector + assertEq(vecL, newLongStream(n).toScala[Vector]) + assertEq(vecL, newLongStream(n).parallel.toScala[Vector]) + } + } + + @Test + def streamUnbox() { + assert(newDoubleStream(1).boxed.unboxed.isInstanceOf[DoubleStream]) + assert(newIntStream(1).boxed.unboxed.isInstanceOf[IntStream]) + assert(newLongStream(1).boxed.unboxed.isInstanceOf[LongStream]) + } + + @Test + def scalaToStream() { + for (n <- ns) { + val arrO = arrayO(n) + val seqO = arrO.toSeq + assertEq(seqO, seqO.seqStream.toScala[Seq]) + assertEq(seqO, seqO.parStream.toScala[Seq]) + assertEq(seqO, arrO.seqStream.toScala[Seq]) + assertEq(seqO, arrO.parStream.toScala[Seq]) + + val arrD = arrayD(n) + val seqD = arrD.toSeq + assertEq(seqD, seqD.seqStream.toScala[Seq]) + assertEq(seqD, seqD.parStream.toScala[Seq]) + assertEq(seqD, arrD.seqStream.toScala[Seq]) + assertEq(seqD, arrD.parStream.toScala[Seq]) + assert(arrD.seqStream.isInstanceOf[DoubleStream]) + assert(arrD.parStream.isInstanceOf[DoubleStream]) + + val arrI = arrayI(n) + val seqI = arrI.toSeq + assertEq(seqI, seqI.seqStream.toScala[Seq]) + assertEq(seqI, seqI.parStream.toScala[Seq]) + assertEq(seqI, arrI.seqStream.toScala[Seq]) + assertEq(seqI, arrI.parStream.toScala[Seq]) + assert(arrI.seqStream.isInstanceOf[IntStream]) + assert(arrI.parStream.isInstanceOf[IntStream]) + + val arrL = arrayL(n) + val seqL = arrL.toSeq + assertEq(seqL, seqL.seqStream.toScala[Seq]) + assertEq(seqL, seqL.parStream.toScala[Seq]) + assertEq(seqL, arrL.seqStream.toScala[Seq]) + assertEq(seqL, arrL.parStream.toScala[Seq]) + assert(arrL.seqStream.isInstanceOf[LongStream]) + assert(arrL.parStream.isInstanceOf[LongStream]) + } + } +}