diff --git a/src/main/scala/scala/compat/java8/StreamConverters.scala b/src/main/scala/scala/compat/java8/StreamConverters.scala new file mode 100644 index 0000000..1856d71 --- /dev/null +++ b/src/main/scala/scala/compat/java8/StreamConverters.scala @@ -0,0 +1,226 @@ +package scala.compat.java8 + +import language.implicitConversions + +import java.util.stream._ +import scala.compat.java8.collectionCompat._ + +trait PrimitiveStreamAccumulator[S, AA] { + def streamAccumulate(stream: S): AA +} + +trait PrimitiveStreamUnboxer[A, S] { + def apply(boxed: Stream[A]): S +} + +trait Priority2StreamConverters { + implicit class EnrichAnyScalaCollectionWithStream[A](t: TraversableOnce[A]) { + private def mkAcc() = { + val acc = new Accumulator[A] + t.foreach{ acc += _ } + acc + } + + def seqStream: Stream[A] = mkAcc().seqStream + + def parStream: Stream[A] = mkAcc().parStream + } + + implicit class EnrichMissingPrimitiveArrayWithStream[A](a: Array[A]) { + private def mkAcc() = { + val acc = new Accumulator[A] + var i = 0 + while (i < a.length) { + acc += a(i) + i += 1 + } + acc + } + + def seqStream: Stream[A] = mkAcc().seqStream + + def parStream: Stream[A] = mkAcc().parStream + } +} + +trait Priority1StreamConverters extends Priority2StreamConverters { + implicit class EnrichScalaCollectionWithStream[A <: AnyRef](t: TraversableOnce[A]) { + private def mkArr()(implicit tag: reflect.ClassTag[A]): Array[A] = { + if (t.isTraversableAgain && t.hasDefiniteSize) { + val sz = t.size + val a = new Array[A](sz) + t.copyToArray(a, 0, sz) + a + } + else t.toArray[A] + } + + def seqStream(implicit tag: reflect.ClassTag[A]): Stream[A] = + java.util.Arrays.stream(mkArr()) + + def parStream(implicit tag: reflect.ClassTag[A]): Stream[A] = seqStream.parallel + } + + implicit class EnrichGenericArrayWithStream[A <: AnyRef](a: Array[A]) { + def seqStream: Stream[A] = java.util.Arrays.stream(a) + def parStream: Stream[A] = seqStream.parallel + } + + implicit class RichStream[A](stream: Stream[A]) { + def accumulate = stream.collect(Accumulator.supplier[A], Accumulator.adder[A], Accumulator.merger[A]) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, A, Coll[A]]): Coll[A] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.Consumer[A]{ def accept(a: A) { b += a } }) + b.result() + } + } + + def unboxed[S](implicit ubx: PrimitiveStreamUnboxer[A, S]): S = ubx(stream) + } + + implicit class RichStreamCanAccumulatePrimitive[S](stream: S) { + def accumulatePrimitive[AA](implicit psa: PrimitiveStreamAccumulator[S, AA]) = psa.streamAccumulate(stream) + } +} + +/** `StreamConverters` provides extension methods and other functionality to + * ease interoperability of Scala collections with `java.util.stream` classes. + * + * Scala collections gain extension methods `seqStream` and + * `parStream` that allow them to be used as the source of a `Stream`. + * + * `Array` also gains `seqStream` and `parStream` methods, and calling those + * on `Array[Double]`, `Array[Int]`, or `Array[Long]` will produce the + * corresponding primitive stream. + * + * Streams gain `accumulate` and `toScala[_]` methods, which collect the stream + * into a custom high-performance `scala.collection.mutable.java8.Accumulator`, + * which is not part of the standard collections hierarchy, or into a named + * Scala collection, respectively. + * + * Generic streams also gain an `unboxed` method that will convert to the + * corresponding unboxed primitive stream, if appropriate. Unboxed streams + * have custom accumulators with improved performance. + * + * Accumulators have `toArray`, `toList`, `iterator`, and `to[_]` methods + * to convert to standard Scala collections. + * + * Example: + * ``` + * import scala.compat.java8.StreamConverers._ + * + * val s = Vector(1,2,3,4).parStream // Stream[Int] + * val si = s.unboxed // Stream.OfInt + * val ai = si.accumulate // IntAccumulator + * val v = ai.to[Vector] // Vector[Int] again + * + * val t = Array(2.0, 3.0, 4.0).parStream // DoubleStream + * val q = t.toScala[scala.collection.immutable.Queue] // Queue[Double] + * ``` + */ +object StreamConverters extends Priority1StreamConverters { + implicit class EnrichDoubleArrayWithStream(a: Array[Double]) { + def seqStream: DoubleStream = java.util.Arrays.stream(a) + def parStream: DoubleStream = seqStream.parallel + } + + implicit class EnrichIntArrayWithStream(a: Array[Int]) { + def seqStream: IntStream = java.util.Arrays.stream(a) + def parStream: IntStream = seqStream.parallel + } + + implicit class EnrichLongArrayWithStream(a: Array[Long]) { + def seqStream: LongStream = java.util.Arrays.stream(a) + def parStream: LongStream = seqStream.parallel + } + + implicit val primitiveAccumulateDoubleStream = new PrimitiveStreamAccumulator[Stream[Double], DoubleAccumulator] { + def streamAccumulate(stream: Stream[Double]): DoubleAccumulator = + stream.collect(DoubleAccumulator.supplier, DoubleAccumulator.boxedAdder, DoubleAccumulator.merger) + } + + implicit val primitiveAccumulateDoubleStream2 = + primitiveAccumulateDoubleStream.asInstanceOf[PrimitiveStreamAccumulator[Stream[java.lang.Double], DoubleAccumulator]] + + implicit val primitiveUnboxDoubleStream = new PrimitiveStreamUnboxer[Double, DoubleStream] { + def apply(boxed: Stream[Double]): DoubleStream = + boxed.mapToDouble(new java.util.function.ToDoubleFunction[Double]{ def applyAsDouble(d: Double) = d }) + } + + implicit val primitiveUnboxDoubleStream2 = + primitiveUnboxDoubleStream.asInstanceOf[PrimitiveStreamUnboxer[java.lang.Double, DoubleStream]] + + implicit val primitiveAccumulateIntStream = new PrimitiveStreamAccumulator[Stream[Int], IntAccumulator] { + def streamAccumulate(stream: Stream[Int]): IntAccumulator = + stream.collect(IntAccumulator.supplier, IntAccumulator.boxedAdder, IntAccumulator.merger) + } + + implicit val primitiveAccumulateIntStream2 = + primitiveAccumulateIntStream.asInstanceOf[PrimitiveStreamAccumulator[Stream[java.lang.Integer], IntAccumulator]] + + implicit val primitiveUnboxIntStream = new PrimitiveStreamUnboxer[Int, IntStream] { + def apply(boxed: Stream[Int]): IntStream = + boxed.mapToInt(new java.util.function.ToIntFunction[Int]{ def applyAsInt(d: Int) = d }) + } + + implicit val primitiveUnboxIntStream2 = + primitiveUnboxIntStream.asInstanceOf[PrimitiveStreamUnboxer[java.lang.Integer, IntStream]] + + implicit val primitiveAccumulateLongStream = new PrimitiveStreamAccumulator[Stream[Long], LongAccumulator] { + def streamAccumulate(stream: Stream[Long]): LongAccumulator = + stream.collect(LongAccumulator.supplier, LongAccumulator.boxedAdder, LongAccumulator.merger) + } + + implicit val primitiveAccumulateLongStream2 = + primitiveAccumulateLongStream.asInstanceOf[PrimitiveStreamAccumulator[Stream[java.lang.Long], LongAccumulator]] + + implicit val primitiveUnboxLongStream = new PrimitiveStreamUnboxer[Long, LongStream] { + def apply(boxed: Stream[Long]): LongStream = + boxed.mapToLong(new java.util.function.ToLongFunction[Long]{ def applyAsLong(d: Long) = d }) + } + + implicit val primitiveUnboxLongStream2 = + primitiveUnboxLongStream.asInstanceOf[PrimitiveStreamUnboxer[java.lang.Long, LongStream]] + + implicit class RichDoubleStream(stream: DoubleStream) { + def accumulate = stream.collect(DoubleAccumulator.supplier, DoubleAccumulator.adder, DoubleAccumulator.merger) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Double, Coll[Double]]): Coll[Double] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.DoubleConsumer{ def accept(d: Double) { b += d } }) + b.result() + } + } + } + + implicit class RichIntStream(stream: IntStream) { + def accumulate = stream.collect(IntAccumulator.supplier, IntAccumulator.adder, IntAccumulator.merger) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Int, Coll[Int]]): Coll[Int] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.IntConsumer{ def accept(d: Int) { b += d } }) + b.result() + } + } + } + + implicit class RichLongStream(stream: LongStream) { + def accumulate = stream.collect(LongAccumulator.supplier, LongAccumulator.adder, LongAccumulator.merger) + + def toScala[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Long, Coll[Long]]): Coll[Long] = { + if (stream.isParallel) accumulate.to[Coll](cbf) + else { + val b = cbf() + stream.forEachOrdered(new java.util.function.LongConsumer{ def accept(d: Long) { b += d } }) + b.result() + } + } + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala new file mode 100644 index 0000000..de93806 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/Accumulator.scala @@ -0,0 +1,331 @@ +package scala.compat.java8.collectionCompat + +/** An `Accumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging Accumulators. + * Accumulators can contain more than `Int.MaxValue` elements. + */ +final class Accumulator[A] +extends AccumulatorLike[A, Accumulator[A]] +{ self => + private[java8] var current: Array[AnyRef] = Accumulator.emptyAnyRefArray + private[java8] var history: Array[Array[AnyRef]] = Accumulator.emptyAnyRefArrayArray + private[java8] var cumul: Array[Long] = Accumulator.emptyLongArray + + private[java8] def cumulative(i: Int) = cumul(i) + + private def expand(): Unit = { + if (index > 0) { + if (hIndex >= history.length) hExpand() + history(hIndex) = current + cumul(hIndex) = (if (hIndex > 0) cumulative(hIndex-1) else 0) + index + hIndex += 1 + } + current = new Array[AnyRef](nextBlockSize) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) { + history = new Array[Array[AnyRef]](4) + cumul = new Array[Long](4) + } + else { + history = java.util.Arrays.copyOf(history, history.length << 1) + cumul = java.util.Arrays.copyOf(cumul, cumul.length << 1) + } + } + + /** Appends an element to this `Accumulator`. */ + final def +=(a: A): Unit = { + totalSize += 1 + if (index >= current.length) expand() + current(index) = a.asInstanceOf[AnyRef] + index += 1 + } + + /** Removes all elements from `that` and appends them to this `Accumulator`. */ + final def drain[A1 <: A](that: Accumulator[A1]): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val n = (that.cumulative(h) - prev).toInt + if (current.length - index >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = that.cumulative(h) + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index >= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + cumul = java.util.Arrays.copyOf(cumul, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + pv += index + cumul(hIndex) = pv + history(hIndex) = (if (index < (current.length >>> 3) && current.length > 32) java.util.Arrays.copyOf(current, index) else current) + hIndex += 1 + } + while (h < that.hIndex) { + pv += that.cumulative(h) - prev + prev = that.cumulative(h) + cumul(hIndex) = pv + history(hIndex) = that.history(h) + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = Accumulator.emptyAnyRefArray + history = Accumulator.emptyAnyRefArrayArray + cumul = Accumulator.emptyLongArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): A = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt).asInstanceOf[A] + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt).asInstanceOf[A] + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): A = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `Accumulator`. */ + final def iterator = new Iterator[A] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[AnyRef] = Accumulator.emptyAnyRefArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans.asInstanceOf[A] + } + } + + /** Returns a `java.util.Spliterator` over the contents of this `Accumulator`*/ + final def spliterator: java.util.Spliterator[A] = new AccumulatorSpliterator[A](this) + + /** Produces a sequential Java 8 Stream over the elements of this `Accumulator`*/ + final def seqStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(spliterator, false) + + /** Produces a parallel Java 8 Stream over the elements of this `Accumulator`*/ + final def parStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(spliterator, true) + + /** Copies the elements in this `Accumulator` into an `Array` */ + final def toArray(implicit tag: reflect.ClassTag[A]) = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[A](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + a(j) = x(i).asInstanceOf[A] + i += 1 + j += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + a(j) = current(i).asInstanceOf[A] + i += 1 + j += 1 + } + a + } + + /** Copies the elements in this `Accumulator` to a `List` */ + final def toList: List[A] = { + var ans: List[A] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i).asInstanceOf[A] :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i).asInstanceOf[A] :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `Accumulator` to a specified collection. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, A, Coll[A]]): Coll[A] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + b += x(i).asInstanceOf[A] + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i).asInstanceOf[A] + i += 1 + } + b.result + } +} +object Accumulator { + private val emptyAnyRefArray = new Array[AnyRef](0) + private val emptyAnyRefArrayArray = new Array[Array[AnyRef]](0) + private val emptyLongArray = new Array[Long](0) + + /** A `Supplier` of `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def supplier[A] = new java.util.function.Supplier[Accumulator[A]]{ def get: Accumulator[A] = new Accumulator[A] } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def adder[A] = new java.util.function.BiConsumer[Accumulator[A], A]{ def accept(ac: Accumulator[A], a: A) { ac += a } } + + /** A `BiConsumer` that merges `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def merger[A] = new java.util.function.BiConsumer[Accumulator[A], Accumulator[A]]{ def accept(a1: Accumulator[A], a2: Accumulator[A]) { a1 drain a2 } } +} + +private[java8] class AccumulatorSpliterator[A](private val acc: Accumulator[A]) extends java.util.Spliterator[A] { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): AccumulatorSpliterator[A] = { + val ans = new AccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.Consumer[_ >: A]) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i).asInstanceOf[A]) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.Consumer[_ >: A]): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i).asInstanceOf[A]) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator[A] = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } + + override def toString = s"$h $i ${a.mkString("{",",","}")} $n $N" +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala b/src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala new file mode 100644 index 0000000..75de367 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/AccumulatorLike.scala @@ -0,0 +1,41 @@ +package scala.compat.java8.collectionCompat + +/** An accumulator that works with Java 8 streams; it accepts elements of type `A`, + * is itself an `AC`. Accumulators can handle more than `Int.MaxValue` elements. + */ +trait AccumulatorLike[@specialized(Double, Int, Long) A, AC] { + private[java8] var index: Int = 0 + private[java8] var hIndex: Int = 0 + private[java8] var totalSize: Long = 0L + private[java8] def cumulative(i: Int): Long + + private[java8] def nextBlockSize: Int = { + if (totalSize < 32) 16 + else if (totalSize <= Int.MaxValue) { + val bit = (64 - java.lang.Long.numberOfLeadingZeros(totalSize)) + 1 << (bit - (bit >> 2)) + } + else 1 << 24 + } + + /** Size of the accumulated collection, as a `Long` */ + final def size = totalSize + + /** Remove all accumulated elements from this accumulator. */ + def clear(): Unit = { + index = 0 + hIndex = 0 + totalSize = 0L + } + + private[java8] def seekSlot(ix: Long): Long = { + var lo = -1 + var hi = hIndex + while (lo + 1 < hi) { + val m = (lo + hi) >>> 1 // Shift allows division-as-unsigned, prevents overflow + if (cumulative(m) > ix) hi = m + else lo = m + } + (hi.toLong << 32) | (if (hi==0) ix else (ix - cumulative(hi-1))).toInt + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala new file mode 100644 index 0000000..50b9d47 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/DoubleAccumulator.scala @@ -0,0 +1,327 @@ +package scala.compat.java8.collectionCompat + +/** A `DoubleAccumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging them. + * This is a manually specialized variant of `Accumulator` with no actual + * subclassing relationship with `Accumulator`. + */ +final class DoubleAccumulator +extends AccumulatorLike[Double, DoubleAccumulator] +{ self => + private[java8] var current: Array[Double] = DoubleAccumulator.emptyDoubleArray + private[java8] var history: Array[Array[Double]] = DoubleAccumulator.emptyDoubleArrayArray + + private[java8] def cumulative(i: Int) = { val x = history(i); x(x.length-1).toLong } + + private def expand(): Unit = { + if (index > 0) { + current(current.length-1) = (if (hIndex > 0) { val x = history(hIndex-1); x(x.length-1) } else 0) + index + if (hIndex >= history.length) hExpand() + history(hIndex) = current + hIndex += 1 + } + current = new Array[Double](nextBlockSize+1) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) history = new Array[Array[Double]](4) + else history = java.util.Arrays.copyOf(history, history.length << 1) + } + + /** Appends an element to this `DoubleAccumulator`. */ + final def +=(a: Double): Unit = { + totalSize += 1 + if (index+1 >= current.length) expand() + current(index) = a + index += 1 + } + + /** Removes all elements from `that` and appends them to this `DoubleAccumulator`. */ + final def drain(that: DoubleAccumulator): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val cuml = that.cumulative(h) + val n = (cuml - prev).toInt + if (current.length - index - 1 >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = cuml + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index - 1>= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + val x = + if (index < (current.length >>> 3) && current.length - 1 > 32) { + val ans = java.util.Arrays.copyOf(current, index + 1) + ans(ans.length - 1) = current(current.length - 1) + ans + } + else current + pv = pv + index + x(x.length - 1) = pv + history(hIndex) = x + hIndex += 1 + } + while (h < that.hIndex) { + val cuml = that.cumulative(h) + pv = pv + cuml - prev + prev = cuml + val x = that.history(h) + x(x.length - 1) = pv + history(hIndex) = x + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = DoubleAccumulator.emptyDoubleArray + history = DoubleAccumulator.emptyDoubleArrayArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): Double = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt) + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt) + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): Double = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `DoubleAccumulator`. The `Iterator` is not specialized. */ + final def iterator = new Iterator[Double] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[Double] = DoubleAccumulator.emptyDoubleArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans + } + } + + /** Returns a `java.util.Spliterator.OfDouble` over the contents of this `DoubleAccumulator`*/ + final def spliterator: java.util.Spliterator.OfDouble = new DoubleAccumulatorSpliterator(this) + + /** Produces a sequential Java 8 `DoubleStream` over the elements of this `DoubleAccumulator`*/ + final def seqStream: java.util.stream.DoubleStream = java.util.stream.StreamSupport.doubleStream(spliterator, false) + + /** Produces a parallel Java 8 `DoubleStream` over the elements of this `DoubleAccumulator`*/ + final def parStream: java.util.stream.DoubleStream = java.util.stream.StreamSupport.doubleStream(spliterator, true) + + /** Copies the elements in this `DoubleAccumulator` into an `Array[Double]` */ + final def toArray = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[Double](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = x(x.length-1).toLong + val n = (cuml - pv).toInt + pv = cuml + System.arraycopy(x, 0, a, j, n) + j += n + h += 1 + } + System.arraycopy(current, 0, a, j, index) + j += index + a + } + + /** Copies the elements in this `DoubleAccumulator` to a `List` */ + final def toList: List[Double] = { + var ans: List[Double] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i) :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i) :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `DoubleAccumulator` to a specified collection. + * Note that the target collection is not specialized. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Double, Coll[Double]]): Coll[Double] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + b += x(i) + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i) + i += 1 + } + b.result + } +} +object DoubleAccumulator { + private val emptyDoubleArray = new Array[Double](0) + private val emptyDoubleArrayArray = new Array[Array[Double]](0) + + /** A `Supplier` of `DoubleAccumulator`s, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. Suitable for `Stream[Double]` also. */ + def supplier = new java.util.function.Supplier[DoubleAccumulator]{ def get: DoubleAccumulator = new DoubleAccumulator } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. */ + def adder = new java.util.function.ObjDoubleConsumer[DoubleAccumulator]{ def accept(ac: DoubleAccumulator, a: Double) { ac += a } } + + /** A `BiConsumer` that adds a boxed `Double` to an `DoubleAccumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def boxedAdder = new java.util.function.BiConsumer[DoubleAccumulator, Double]{ def accept(ac: DoubleAccumulator, a: Double) { ac += a } } + + /** A `BiConsumer` that merges `DoubleAccumulator`s, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. Suitable for `Stream[Double]` also. */ + def merger = new java.util.function.BiConsumer[DoubleAccumulator, DoubleAccumulator]{ def accept(a1: DoubleAccumulator, a2: DoubleAccumulator) { a1 drain a2 } } +} + +private[java8] class DoubleAccumulatorSpliterator(private val acc: DoubleAccumulator) extends java.util.Spliterator.OfDouble { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): DoubleAccumulatorSpliterator = { + val ans = new DoubleAccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.DoubleConsumer) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i)) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.DoubleConsumer): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i)) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator.OfDouble = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala new file mode 100644 index 0000000..ea658a3 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/IntAccumulator.scala @@ -0,0 +1,333 @@ +package scala.compat.java8.collectionCompat + +/** A `IntAccumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging them. + * This is a manually specialized variant of `Accumulator` with no actual + * subclassing relationship with `Accumulator`. + */ +final class IntAccumulator +extends AccumulatorLike[Int, IntAccumulator] +{ self => + private[java8] var current: Array[Int] = IntAccumulator.emptyIntArray + private[java8] var history: Array[Array[Int]] = IntAccumulator.emptyIntArrayArray + + private[java8] def cumulative(i: Int) = { val x = history(i); x(x.length-2).toLong << 32 | (x(x.length-1)&0xFFFFFFFFL) } + + private def expand(): Unit = { + if (index > 0) { + val cuml = (if (hIndex > 0) cumulative(hIndex-1) else 0) + index + current(current.length-2) = (cuml >>> 32).toInt + current(current.length-1) = (cuml & 0xFFFFFFFFL).toInt + if (hIndex >= history.length) hExpand() + history(hIndex) = current + hIndex += 1 + } + current = new Array[Int](nextBlockSize+1) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) history = new Array[Array[Int]](4) + else history = java.util.Arrays.copyOf(history, history.length << 1) + } + + /** Appends an element to this `IntAccumulator`. */ + final def +=(a: Int): Unit = { + totalSize += 1 + if (index+2 >= current.length) expand() + current(index) = a + index += 1 + } + + /** Removes all elements from `that` and appends them to this `IntAccumulator`. */ + final def drain(that: IntAccumulator): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val cuml = that.cumulative(h) + val n = (cuml - prev).toInt + if (current.length - index - 2 >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = cuml + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index - 2 >= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + val x = + if (index < (current.length >>> 3) && current.length - 1 > 32) { + val ans = java.util.Arrays.copyOf(current, index + 2) + ans(ans.length - 2) = current(current.length - 2) + ans(ans.length - 1) = current(current.length - 1) + ans + } + else current + pv = pv + index + x(x.length - 2) = (pv >>> 32).toInt + x(x.length - 1) = (pv & 0xFFFFFFFFL).toInt + history(hIndex) = x + hIndex += 1 + } + while (h < that.hIndex) { + val cuml = that.cumulative(h) + pv = pv + cuml - prev + prev = cuml + val x = that.history(h) + x(x.length - 2) = (pv >>> 32).toInt + x(x.length - 1) = (pv & 0xFFFFFFFFL).toInt + history(hIndex) = x + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = IntAccumulator.emptyIntArray + history = IntAccumulator.emptyIntArrayArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): Int = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt) + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt) + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): Int = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `IntAccumulator`. The `Iterator` is not specialized. */ + final def iterator = new Iterator[Int] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[Int] = IntAccumulator.emptyIntArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans + } + } + + /** Returns a `java.util.Spliterator.OfInt` over the contents of this `IntAccumulator`*/ + final def spliterator: java.util.Spliterator.OfInt = new IntAccumulatorSpliterator(this) + + /** Produces a sequential Java 8 `IntStream` over the elements of this `IntAccumulator`*/ + final def seqStream: java.util.stream.IntStream = java.util.stream.StreamSupport.intStream(spliterator, false) + + /** Produces a parallel Java 8 `IntStream` over the elements of this `IntAccumulator`*/ + final def parStream: java.util.stream.IntStream = java.util.stream.StreamSupport.intStream(spliterator, true) + + /** Copies the elements in this `IntAccumulator` into an `Array[Int]` */ + final def toArray = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[Int](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = cumulative(h) + val n = (cuml - pv).toInt + pv = cuml + System.arraycopy(x, 0, a, j, n) + j += n + h += 1 + } + System.arraycopy(current, 0, a, j, index) + j += index + a + } + + /** Copies the elements in this `IntAccumulator` to a `List` */ + final def toList: List[Int] = { + var ans: List[Int] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i) :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i) :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `IntAccumulator` to a specified collection. + * Note that the target collection is not specialized. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Int, Coll[Int]]): Coll[Int] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = cumulative(h) + val n = cuml - pv + pv = cuml + var i = 0 + while (i < n) { + b += x(i) + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i) + i += 1 + } + b.result + } +} +object IntAccumulator { + private val emptyIntArray = new Array[Int](0) + private val emptyIntArrayArray = new Array[Array[Int]](0) + + /** A `Supplier` of `IntAccumulator`s, suitable for use with `java.util.stream.IntStream`'s `collect` method. Suitable for `Stream[Int]` also. */ + def supplier = new java.util.function.Supplier[IntAccumulator]{ def get: IntAccumulator = new IntAccumulator } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.IntStream`'s `collect` method. */ + def adder = new java.util.function.ObjIntConsumer[IntAccumulator]{ def accept(ac: IntAccumulator, a: Int) { ac += a } } + + /** A `BiConsumer` that adds a boxed `Int` to an `IntAccumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def boxedAdder = new java.util.function.BiConsumer[IntAccumulator, Int]{ def accept(ac: IntAccumulator, a: Int) { ac += a } } + + /** A `BiConsumer` that merges `IntAccumulator`s, suitable for use with `java.util.stream.IntStream`'s `collect` method. Suitable for `Stream[Int]` also. */ + def merger = new java.util.function.BiConsumer[IntAccumulator, IntAccumulator]{ def accept(a1: IntAccumulator, a2: IntAccumulator) { a1 drain a2 } } +} + +private[java8] class IntAccumulatorSpliterator(private val acc: IntAccumulator) extends java.util.Spliterator.OfInt { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): IntAccumulatorSpliterator = { + val ans = new IntAccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.IntConsumer) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i)) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.IntConsumer): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i)) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator.OfInt = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } +} diff --git a/src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala b/src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala new file mode 100644 index 0000000..f976b08 --- /dev/null +++ b/src/main/scala/scala/compat/java8/collectionCompat/LongAccumulator.scala @@ -0,0 +1,327 @@ +package scala.compat.java8.collectionCompat + +/** A `LongAccumulator` is a low-level collection specialized for gathering + * elements in parallel and then joining them in order by merging them. + * This is a manually specialized variant of `Accumulator` with no actual + * subclassing relationship with `Accumulator`. + */ +final class LongAccumulator +extends AccumulatorLike[Long, LongAccumulator] +{ self => + private[java8] var current: Array[Long] = LongAccumulator.emptyLongArray + private[java8] var history: Array[Array[Long]] = LongAccumulator.emptyLongArrayArray + + private[java8] def cumulative(i: Int) = { val x = history(i); x(x.length-1) } + + private def expand(): Unit = { + if (index > 0) { + current(current.length-1) = (if (hIndex > 0) { val x = history(hIndex-1); x(x.length-1) } else 0) + index + if (hIndex >= history.length) hExpand() + history(hIndex) = current + hIndex += 1 + } + current = new Array[Long](nextBlockSize+1) + index = 0 + } + + private def hExpand(): Unit = { + if (hIndex == 0) history = new Array[Array[Long]](4) + else history = java.util.Arrays.copyOf(history, history.length << 1) + } + + /** Appends an element to this `LongAccumulator`. */ + final def +=(a: Long): Unit = { + totalSize += 1 + if (index+1 >= current.length) expand() + current(index) = a + index += 1 + } + + /** Removes all elements from `that` and appends them to this `LongAccumulator`. */ + final def drain(that: LongAccumulator): Unit = { + var h = 0 + var prev = 0L + var more = true + while (more && h < that.hIndex) { + val cuml = that.cumulative(h) + val n = (cuml - prev).toInt + if (current.length - index - 1 >= n) { + System.arraycopy(that.history(h), 0, current, index, n) + prev = cuml + index += n + h += 1 + } + else more = false + } + if (h >= that.hIndex && current.length - index - 1>= that.index) { + if (that.index > 0) System.arraycopy(that.current, 0, current, index, that.index) + index += that.index + } + else { + val slots = (if (index > 0) 1 else 0) + that.hIndex - h + if (hIndex + slots > history.length) { + val n = math.max(4, 1 << (32 - java.lang.Integer.numberOfLeadingZeros(1 + hIndex + slots))) + history = java.util.Arrays.copyOf(history, n) + } + var pv = (if (hIndex > 0) cumulative(hIndex-1) else 0L) + if (index > 0) { + val x = + if (index < (current.length >>> 3) && current.length - 1 > 32) { + val ans = java.util.Arrays.copyOf(current, index + 1) + ans(ans.length - 1) = current(current.length - 1) + ans + } + else current + pv = pv + index + x(x.length - 1) = pv + history(hIndex) = x + hIndex += 1 + } + while (h < that.hIndex) { + val cuml = that.cumulative(h) + pv = pv + cuml - prev + prev = cuml + val x = that.history(h) + x(x.length - 1) = pv + history(hIndex) = x + h += 1 + hIndex += 1 + } + index = that.index + current = that.current + } + totalSize += that.totalSize + that.clear + } + + override def clear(): Unit = { + super.clear() + current = LongAccumulator.emptyLongArray + history = LongAccumulator.emptyLongArrayArray + } + + /** Retrieves the `ix`th element. */ + final def apply(ix: Long): Long = { + if (totalSize - ix <= index || hIndex == 0) current((ix - (totalSize - index)).toInt) + else { + val w = seekSlot(ix) + history((w >>> 32).toInt)((w & 0xFFFFFFFFL).toInt) + } + } + + /** Retrieves the `ix`th element, using an `Int` index. */ + final def apply(i: Int): Long = apply(i.toLong) + + /** Returns an `Iterator` over the contents of this `LongAccumulator`. The `Iterator` is not specialized. */ + final def iterator = new Iterator[Long] { + private var consumed = 0L + private var hix = 0 + private var ix = 0 + private var i = 0 + private var a: Array[Long] = LongAccumulator.emptyLongArray + @annotation.tailrec private def loadMore() { + if (hix >= self.hIndex) { + a = self.current + val pv = (if (self.hIndex == 0) 0L else self.cumulative(self.hIndex-1)) + ix = self.index + i = (consumed - pv).toInt + } + else { + val ch = self.cumulative(hix) + if (ch > consumed) { + a = self.history(hix) + val pv = (if (hix == 0) 0L else self.cumulative(hix-1)) + ix = (ch - pv).toInt + i = (consumed - pv).toInt + } + else { + hix += 1 + loadMore() + } + } + } + def hasNext = consumed < self.totalSize + def next = { + if (i >= ix) loadMore() + val ans = a(i) + i += 1 + consumed += 1 + ans + } + } + + /** Returns a `java.util.Spliterator.OfLong` over the contents of this `LongAccumulator`*/ + final def spliterator: java.util.Spliterator.OfLong = new LongAccumulatorSpliterator(this) + + /** Produces a sequential Java 8 `LongStream` over the elements of this `LongAccumulator`*/ + final def seqStream: java.util.stream.LongStream = java.util.stream.StreamSupport.longStream(spliterator, false) + + /** Produces a parallel Java 8 `LongStream` over the elements of this `LongAccumulator`*/ + final def parStream: java.util.stream.LongStream = java.util.stream.StreamSupport.longStream(spliterator, true) + + /** Copies the elements in this `LongAccumulator` into an `Array[Long]` */ + final def toArray = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for an array: "+totalSize.toString) + val a = new Array[Long](totalSize.toInt) + var j = 0 + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val cuml = x(x.length-1) + val n = (cuml - pv).toInt + pv = cuml + System.arraycopy(x, 0, a, j, n) + j += n + h += 1 + } + System.arraycopy(current, 0, a, j, index) + j += index + a + } + + /** Copies the elements in this `LongAccumulator` to a `List` */ + final def toList: List[Long] = { + var ans: List[Long] = Nil + var i = index - 1 + while (i >= 0) { + ans = current(i) :: ans + i -= 1 + } + var h = hIndex - 1 + while (h >= 0) { + val a = history(h) + i = (cumulative(h) - (if (h == 0) 0L else cumulative(h-1))).toInt - 1 + while (i >= 0) { + ans = a(i) :: ans + i -= 1 + } + h -= 1 + } + ans + } + + /** Copies the elements in this `LongAccumulator` to a specified collection. + * Note that the target collection is not specialized. + * Usage example: `acc.to[Vector]` + */ + final def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, Long, Coll[Long]]): Coll[Long] = { + if (totalSize > Int.MaxValue) throw new IllegalArgumentException("Too many elements accumulated for a Scala collection: "+totalSize.toString) + val b = cbf() + b.sizeHint(totalSize.toInt) + var h = 0 + var pv = 0L + while (h < hIndex) { + val x = history(h) + val n = cumulative(h) - pv + pv = cumulative(h) + var i = 0 + while (i < n) { + b += x(i) + i += 1 + } + h += 1 + } + var i = 0 + while (i < index) { + b += current(i) + i += 1 + } + b.result + } +} +object LongAccumulator { + private val emptyLongArray = new Array[Long](0) + private val emptyLongArrayArray = new Array[Array[Long]](0) + + /** A `Supplier` of `LongAccumulator`s, suitable for use with `java.util.stream.LongStream`'s `collect` method. Suitable for `Stream[Long]` also. */ + def supplier = new java.util.function.Supplier[LongAccumulator]{ def get: LongAccumulator = new LongAccumulator } + + /** A `BiConsumer` that adds an element to an `Accumulator`, suitable for use with `java.util.stream.LongStream`'s `collect` method. */ + def adder = new java.util.function.ObjLongConsumer[LongAccumulator]{ def accept(ac: LongAccumulator, a: Long) { ac += a } } + + /** A `BiConsumer` that adds a boxed `Long` to an `LongAccumulator`, suitable for use with `java.util.stream.Stream`'s `collect` method. */ + def boxedAdder = new java.util.function.BiConsumer[LongAccumulator, Long]{ def accept(ac: LongAccumulator, a: Long) { ac += a } } + + /** A `BiConsumer` that merges `LongAccumulator`s, suitable for use with `java.util.stream.LongStream`'s `collect` method. Suitable for `Stream[Long]` also. */ + def merger = new java.util.function.BiConsumer[LongAccumulator, LongAccumulator]{ def accept(a1: LongAccumulator, a2: LongAccumulator) { a1 drain a2 } } +} + +private[java8] class LongAccumulatorSpliterator(private val acc: LongAccumulator) extends java.util.Spliterator.OfLong { + import java.util.Spliterator._ + + private var h = 0 + private var i = 0 + private var a = if (acc.hIndex > 0) acc.history(0) else acc.current + private var n = if (acc.hIndex > 0) acc.cumulative(0) else acc.index + private var N = acc.totalSize + + private def duplicateSelf(limit: Long = N): LongAccumulatorSpliterator = { + val ans = new LongAccumulatorSpliterator(acc) + ans.h = h + ans.i = i + ans.a = a + ans.n = n + ans.N = limit + ans + } + + private def loadMore(): Unit = { + h += 1 + if (h < acc.hIndex) { a = acc.history(h); n = acc.cumulative(h) - acc.cumulative(h-1) } + else { a = acc.current; n = acc.index } + i = 0 + } + + def characteristics = ORDERED | SIZED | SUBSIZED + + def estimateSize = N + + override def forEachRemaining(f: java.util.function.LongConsumer) { + while (N > 0) { + if (i >= n) loadMore() + val i0 = i + if ((n-i) > N) n = i + N.toInt + while (i < n) { + f.accept(a(i)) + i += 1 + } + N -= (n - i0) + } + } + + def tryAdvance(f: java.util.function.LongConsumer): Boolean = + if (N <= 0) false + else { + if (i >= n) loadMore() + f.accept(a(i)) + i += 1 + N -= 1 + true + } + + def trySplit: java.util.Spliterator.OfLong = + if (N <= 1) null + else { + val half = (N >> 1) + val M = (if (h <= 0) 0L else acc.cumulative(h-1)) + i + val R = M + half + val ans = duplicateSelf(half) + if (h < acc.hIndex) { + val w = acc.seekSlot(R) + h = (w >>> 32).toInt + if (h < acc.hIndex) { + a = acc.history(h) + n = acc.cumulative(h) - (if (h > 0) acc.cumulative(h-1) else 0) + } + else { + a = acc.current + n = acc.index + } + i = (w & 0xFFFFFFFFL).toInt + } + else i += half.toInt + N -= half + ans + } +} diff --git a/src/test/scala/scala/compat/java8/StreamConvertersTest.scala b/src/test/scala/scala/compat/java8/StreamConvertersTest.scala new file mode 100644 index 0000000..17d65eb --- /dev/null +++ b/src/test/scala/scala/compat/java8/StreamConvertersTest.scala @@ -0,0 +1,145 @@ +package scala.compat.java8 + +import org.junit.Test +import org.junit.Assert._ + +class StreamConvertersTest { + import java.util.stream._ + import StreamConverters._ + + def assertEq[A](a1: A, a2: A, s: String) { assertEquals(s, a1, a2) } // Weird order normally! + def assertEq[A](a1: A, a2: A) { assertEq(a1, a2, "not equal") } + + def arrayO(n: Int) = (1 to n).map(_.toString).toArray + def arrayD(n: Int) = (1 to n).map(_.toDouble).toArray + def arrayI(n: Int) = (1 to n).toArray + def arrayL(n: Int) = (1 to n).map(_.toLong).toArray + + def newStream(n: Int) = java.util.Arrays.stream(arrayO(n)) + def newDoubleStream(n: Int) = java.util.Arrays.stream(arrayD(n)) + def newIntStream(n: Int) = java.util.Arrays.stream(arrayI(n)) + def newLongStream(n: Int) = java.util.Arrays.stream(arrayL(n)) + + val ns = Vector(0, 1, 2, 12, 15, 16, 17, 31, 32, 33, 151, 1298, 7159) + + @Test + def streamAccumulate() { + for (n <- ns) { + val vecO = arrayO(n).toVector + val accO = newStream(n).parallel.accumulate + assertEq(vecO, newStream(n).accumulate.to[Vector], s"stream $n to vector") + assertEq(vecO, accO.to[Vector], s"stream $n to vector in parallel") + assertEq(vecO, accO.toArray.toVector, s"stream $n to vector via array in parallel") + assertEq(vecO, accO.iterator.toVector, s"stream $n to vector via iterator in parallel") + assertEq(vecO, accO.toList.toVector, s"stream $n to vector via list in parallel") + assert((0 until accO.size.toInt).forall(i => vecO(i) == accO(i)), s"stream $n indexed via accumulator") + assert(accO.isInstanceOf[scala.compat.java8.collectionCompat.Accumulator[_]], s"stream $n to generic accumulator") + + for (boxless <- Seq(false, true)) { + val sbox = (if (boxless) "" else "(boxed)") + val vecD = arrayD(n).toVector + val accD = + if (boxless) newDoubleStream(n).parallel.accumulate + else newDoubleStream(n).boxed.parallel.accumulatePrimitive + assertEq(vecD, newDoubleStream(n).accumulate.to[Vector], s"double stream $n to vector $sbox") + assertEq(vecD, accD.to[Vector], s"double stream $n to vector in parallel $sbox") + assertEq(vecD, accD.toArray.toVector, s"double stream $n to vector via array in parallel $sbox") + assertEq(vecD, accD.iterator.toVector, s"double stream $n to vector via iterator in parallel $sbox") + assertEq(vecD, accD.toList.toVector, s"double stream $n to vector via list in parallel $sbox") + assert((0 until accD.size.toInt).forall(i => vecD(i) == accD(i)), s"double stream $n indexed via accumulator $sbox") + assert(accD.isInstanceOf[scala.compat.java8.collectionCompat.DoubleAccumulator], s"double stream $n to generic accumulator $sbox") + + val vecI = arrayI(n).toVector + val accI = + if (boxless) newIntStream(n).parallel.accumulate + else newIntStream(n).boxed.parallel.accumulatePrimitive + assertEq(vecI, newIntStream(n).accumulate.to[Vector], s"int stream $n to vector $sbox") + assertEq(vecI, accI.to[Vector], s"int stream $n to vector in parallel $sbox") + assertEq(vecI, accI.toArray.toVector, s"int stream $n to vector via array in parallel $sbox") + assertEq(vecI, accI.iterator.toVector, s"int stream $n to vector via iterator in parallel $sbox") + assertEq(vecI, accI.toList.toVector, s"int stream $n to vector via list in parallel $sbox") + assert((0 until accI.size.toInt).forall(i => vecI(i) == accI(i)), s"int stream $n indexed via accumulator $sbox") + assert(accI.isInstanceOf[scala.compat.java8.collectionCompat.IntAccumulator], s"int stream $n to generic accumulator $sbox") + + val vecL = arrayL(n).toVector + val accL = + if (boxless) newLongStream(n).parallel.accumulate + else newLongStream(n).boxed.parallel.accumulatePrimitive + assertEq(vecL, newLongStream(n).accumulate.to[Vector], s"long stream $n to vector $sbox") + assertEq(vecL, accL.to[Vector], s"long stream $n to vector in parallel $sbox") + assertEq(vecL, accL.toArray.toVector, s"long stream $n to vector via array in parallel $sbox") + assertEq(vecL, accL.iterator.toVector, s"long stream $n to vector via iterator in parallel $sbox") + assertEq(vecL, accL.toList.toVector, s"long stream $n to vector via list in parallel $sbox") + assert((0 until accL.size.toInt).forall(i => vecL(i) == accL(i)), s"long stream $n indexed via accumulator $sbox") + assert(accL.isInstanceOf[scala.compat.java8.collectionCompat.LongAccumulator], s"long stream $n to generic accumulator $sbox") + } + } + } + + @Test + def streamToScala() { + for (n <- ns) { + val vecO = arrayO(n).toVector + assertEq(vecO, newStream(n).toScala[Vector]) + assertEq(vecO, newStream(n).parallel.toScala[Vector]) + + val vecD = arrayD(n).toVector + assertEq(vecD, newDoubleStream(n).toScala[Vector]) + assertEq(vecD, newDoubleStream(n).parallel.toScala[Vector]) + + val vecI = arrayI(n).toVector + assertEq(vecI, newIntStream(n).toScala[Vector]) + assertEq(vecI, newIntStream(n).parallel.toScala[Vector]) + + val vecL = arrayL(n).toVector + assertEq(vecL, newLongStream(n).toScala[Vector]) + assertEq(vecL, newLongStream(n).parallel.toScala[Vector]) + } + } + + @Test + def streamUnbox() { + assert(newDoubleStream(1).boxed.unboxed.isInstanceOf[DoubleStream]) + assert(newIntStream(1).boxed.unboxed.isInstanceOf[IntStream]) + assert(newLongStream(1).boxed.unboxed.isInstanceOf[LongStream]) + } + + @Test + def scalaToStream() { + for (n <- ns) { + val arrO = arrayO(n) + val seqO = arrO.toSeq + assertEq(seqO, seqO.seqStream.toScala[Seq]) + assertEq(seqO, seqO.parStream.toScala[Seq]) + assertEq(seqO, arrO.seqStream.toScala[Seq]) + assertEq(seqO, arrO.parStream.toScala[Seq]) + + val arrD = arrayD(n) + val seqD = arrD.toSeq + assertEq(seqD, seqD.seqStream.toScala[Seq]) + assertEq(seqD, seqD.parStream.toScala[Seq]) + assertEq(seqD, arrD.seqStream.toScala[Seq]) + assertEq(seqD, arrD.parStream.toScala[Seq]) + assert(arrD.seqStream.isInstanceOf[DoubleStream]) + assert(arrD.parStream.isInstanceOf[DoubleStream]) + + val arrI = arrayI(n) + val seqI = arrI.toSeq + assertEq(seqI, seqI.seqStream.toScala[Seq]) + assertEq(seqI, seqI.parStream.toScala[Seq]) + assertEq(seqI, arrI.seqStream.toScala[Seq]) + assertEq(seqI, arrI.parStream.toScala[Seq]) + assert(arrI.seqStream.isInstanceOf[IntStream]) + assert(arrI.parStream.isInstanceOf[IntStream]) + + val arrL = arrayL(n) + val seqL = arrL.toSeq + assertEq(seqL, seqL.seqStream.toScala[Seq]) + assertEq(seqL, seqL.parStream.toScala[Seq]) + assertEq(seqL, arrL.seqStream.toScala[Seq]) + assertEq(seqL, arrL.parStream.toScala[Seq]) + assert(arrL.seqStream.isInstanceOf[LongStream]) + assert(arrL.parStream.isInstanceOf[LongStream]) + } + } +}