|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
package java.util.stream; |
|
|
|
import java.util.DoubleSummaryStatistics; |
|
import java.util.Objects; |
|
import java.util.OptionalDouble; |
|
import java.util.PrimitiveIterator; |
|
import java.util.Spliterator; |
|
import java.util.Spliterators; |
|
import java.util.function.BiConsumer; |
|
import java.util.function.BinaryOperator; |
|
import java.util.function.DoubleBinaryOperator; |
|
import java.util.function.DoubleConsumer; |
|
import java.util.function.DoubleFunction; |
|
import java.util.function.DoublePredicate; |
|
import java.util.function.DoubleToIntFunction; |
|
import java.util.function.DoubleToLongFunction; |
|
import java.util.function.DoubleUnaryOperator; |
|
import java.util.function.IntFunction; |
|
import java.util.function.ObjDoubleConsumer; |
|
import java.util.function.Supplier; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract class DoublePipeline<E_IN> |
|
extends AbstractPipeline<E_IN, Double, DoubleStream> |
|
implements DoubleStream { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
DoublePipeline(Supplier<? extends Spliterator<Double>> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
DoublePipeline(Spliterator<Double> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { |
|
super(upstream, opFlags); |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
private static DoubleConsumer adapt(Sink<Double> sink) { |
|
if (sink instanceof DoubleConsumer) { |
|
return (DoubleConsumer) sink; |
|
} else { |
|
if (Tripwire.ENABLED) |
|
Tripwire.trip(AbstractPipeline.class, |
|
"using DoubleStream.adapt(Sink<Double> s)"); |
|
return sink::accept; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static Spliterator.OfDouble adapt(Spliterator<Double> s) { |
|
if (s instanceof Spliterator.OfDouble) { |
|
return (Spliterator.OfDouble) s; |
|
} else { |
|
if (Tripwire.ENABLED) |
|
Tripwire.trip(AbstractPipeline.class, |
|
"using DoubleStream.adapt(Spliterator<Double> s)"); |
|
throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); |
|
} |
|
} |
|
|
|
|
|
// Shape-specific methods |
|
|
|
@Override |
|
final StreamShape getOutputShape() { |
|
return StreamShape.DOUBLE_VALUE; |
|
} |
|
|
|
@Override |
|
final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, |
|
Spliterator<P_IN> spliterator, |
|
boolean flattenTree, |
|
IntFunction<Double[]> generator) { |
|
return Nodes.collectDouble(helper, spliterator, flattenTree); |
|
} |
|
|
|
@Override |
|
final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, |
|
Supplier<Spliterator<P_IN>> supplier, |
|
boolean isParallel) { |
|
return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); |
|
} |
|
|
|
@Override |
|
@SuppressWarnings("unchecked") |
|
final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { |
|
return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); |
|
} |
|
|
|
@Override |
|
final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { |
|
Spliterator.OfDouble spl = adapt(spliterator); |
|
DoubleConsumer adaptedSink = adapt(sink); |
|
boolean cancelled; |
|
do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); |
|
return cancelled; |
|
} |
|
|
|
@Override |
|
final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { |
|
return Nodes.doubleBuilder(exactSizeIfKnown); |
|
} |
|
|
|
private <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper, int opFlags) { |
|
return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, opFlags) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<U> sink) { |
|
return new Sink.ChainedDouble<U>(sink) { |
|
@Override |
|
public void accept(double t) { |
|
downstream.accept(mapper.apply(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
// DoubleStream |
|
|
|
@Override |
|
public final PrimitiveIterator.OfDouble iterator() { |
|
return Spliterators.iterator(spliterator()); |
|
} |
|
|
|
@Override |
|
public final Spliterator.OfDouble spliterator() { |
|
return adapt(super.spliterator()); |
|
} |
|
|
|
// Stateless intermediate ops from DoubleStream |
|
|
|
@Override |
|
public final Stream<Double> boxed() { |
|
return mapToObj(Double::valueOf, 0); |
|
} |
|
|
|
@Override |
|
public final DoubleStream map(DoubleUnaryOperator mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedDouble<Double>(sink) { |
|
@Override |
|
public void accept(double t) { |
|
downstream.accept(mapper.applyAsDouble(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { |
|
Objects.requireNonNull(mapper); |
|
return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT); |
|
} |
|
|
|
@Override |
|
public final IntStream mapToInt(DoubleToIntFunction mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedDouble<Integer>(sink) { |
|
@Override |
|
public void accept(double t) { |
|
downstream.accept(mapper.applyAsInt(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final LongStream mapToLong(DoubleToLongFunction mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedDouble<Long>(sink) { |
|
@Override |
|
public void accept(double t) { |
|
downstream.accept(mapper.applyAsLong(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedDouble<Double>(sink) { |
|
|
|
boolean cancellationRequestedCalled; |
|
|
|
|
|
DoubleConsumer downstreamAsDouble = downstream::accept; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(double t) { |
|
try (DoubleStream result = mapper.apply(t)) { |
|
if (result != null) { |
|
if (!cancellationRequestedCalled) { |
|
result.sequential().forEach(downstreamAsDouble); |
|
} |
|
else { |
|
var s = result.sequential().spliterator(); |
|
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); |
|
} |
|
} |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
// If this method is called then an operation within the stream |
|
// pipeline is short-circuiting (see AbstractPipeline.copyInto). |
|
// Note that we cannot differentiate between an upstream or |
|
|
|
cancellationRequestedCalled = true; |
|
return downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream mapMulti(DoubleMapMultiConsumer mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<>(this, StreamShape.DOUBLE_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
|
|
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedDouble<>(sink) { |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
@SuppressWarnings("unchecked") |
|
public void accept(double t) { |
|
mapper.accept(t, (DoubleConsumer) downstream); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public DoubleStream unordered() { |
|
if (!isOrdered()) |
|
return this; |
|
return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return sink; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream filter(DoublePredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, |
|
StreamOpFlag.NOT_SIZED) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedDouble<Double>(sink) { |
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(double t) { |
|
if (predicate.test(t)) |
|
downstream.accept(t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream peek(DoubleConsumer action) { |
|
Objects.requireNonNull(action); |
|
return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, |
|
0) { |
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedDouble<Double>(sink) { |
|
@Override |
|
public void accept(double t) { |
|
action.accept(t); |
|
downstream.accept(t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
// Stateful intermediate ops from DoubleStream |
|
|
|
@Override |
|
public final DoubleStream limit(long maxSize) { |
|
if (maxSize < 0) |
|
throw new IllegalArgumentException(Long.toString(maxSize)); |
|
return SliceOps.makeDouble(this, (long) 0, maxSize); |
|
} |
|
|
|
@Override |
|
public final DoubleStream skip(long n) { |
|
if (n < 0) |
|
throw new IllegalArgumentException(Long.toString(n)); |
|
if (n == 0) |
|
return this; |
|
else { |
|
long limit = -1; |
|
return SliceOps.makeDouble(this, n, limit); |
|
} |
|
} |
|
|
|
@Override |
|
public final DoubleStream takeWhile(DoublePredicate predicate) { |
|
return WhileOps.makeTakeWhileDouble(this, predicate); |
|
} |
|
|
|
@Override |
|
public final DoubleStream dropWhile(DoublePredicate predicate) { |
|
return WhileOps.makeDropWhileDouble(this, predicate); |
|
} |
|
|
|
@Override |
|
public final DoubleStream sorted() { |
|
return SortedOps.makeDouble(this); |
|
} |
|
|
|
@Override |
|
public final DoubleStream distinct() { |
|
// While functional and quick to implement, this approach is not very efficient. |
|
|
|
return boxed().distinct().mapToDouble(i -> (double) i); |
|
} |
|
|
|
// Terminal ops from DoubleStream |
|
|
|
@Override |
|
public void forEach(DoubleConsumer consumer) { |
|
evaluate(ForEachOps.makeDouble(consumer, false)); |
|
} |
|
|
|
@Override |
|
public void forEachOrdered(DoubleConsumer consumer) { |
|
evaluate(ForEachOps.makeDouble(consumer, true)); |
|
} |
|
|
|
@Override |
|
public final double sum() { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
double[] summation = collect(() -> new double[3], |
|
(ll, d) -> { |
|
Collectors.sumWithCompensation(ll, d); |
|
ll[2] += d; |
|
}, |
|
(ll, rr) -> { |
|
Collectors.sumWithCompensation(ll, rr[0]); |
|
Collectors.sumWithCompensation(ll, rr[1]); |
|
ll[2] += rr[2]; |
|
}); |
|
|
|
return Collectors.computeFinalSum(summation); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble min() { |
|
return reduce(Math::min); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble max() { |
|
return reduce(Math::max); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
@Override |
|
public final OptionalDouble average() { |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
double[] avg = collect(() -> new double[4], |
|
(ll, d) -> { |
|
ll[2]++; |
|
Collectors.sumWithCompensation(ll, d); |
|
ll[3] += d; |
|
}, |
|
(ll, rr) -> { |
|
Collectors.sumWithCompensation(ll, rr[0]); |
|
Collectors.sumWithCompensation(ll, rr[1]); |
|
ll[2] += rr[2]; |
|
ll[3] += rr[3]; |
|
}); |
|
return avg[2] > 0 |
|
? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) |
|
: OptionalDouble.empty(); |
|
} |
|
|
|
@Override |
|
public final long count() { |
|
return evaluate(ReduceOps.makeDoubleCounting()); |
|
} |
|
|
|
@Override |
|
public final DoubleSummaryStatistics summaryStatistics() { |
|
return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, |
|
DoubleSummaryStatistics::combine); |
|
} |
|
|
|
@Override |
|
public final double reduce(double identity, DoubleBinaryOperator op) { |
|
return evaluate(ReduceOps.makeDouble(identity, op)); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble reduce(DoubleBinaryOperator op) { |
|
return evaluate(ReduceOps.makeDouble(op)); |
|
} |
|
|
|
@Override |
|
public final <R> R collect(Supplier<R> supplier, |
|
ObjDoubleConsumer<R> accumulator, |
|
BiConsumer<R, R> combiner) { |
|
Objects.requireNonNull(combiner); |
|
BinaryOperator<R> operator = (left, right) -> { |
|
combiner.accept(left, right); |
|
return left; |
|
}; |
|
return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); |
|
} |
|
|
|
@Override |
|
public final boolean anyMatch(DoublePredicate predicate) { |
|
return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); |
|
} |
|
|
|
@Override |
|
public final boolean allMatch(DoublePredicate predicate) { |
|
return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); |
|
} |
|
|
|
@Override |
|
public final boolean noneMatch(DoublePredicate predicate) { |
|
return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble findFirst() { |
|
return evaluate(FindOps.makeDouble(true)); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble findAny() { |
|
return evaluate(FindOps.makeDouble(false)); |
|
} |
|
|
|
@Override |
|
public final double[] toArray() { |
|
return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) |
|
.asPrimitiveArray(); |
|
} |
|
|
|
// |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static class Head<E_IN> extends DoublePipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
Head(Supplier<? extends Spliterator<Double>> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
Head(Spliterator<Double> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
throw new UnsupportedOperationException(); |
|
} |
|
|
|
@Override |
|
final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { |
|
throw new UnsupportedOperationException(); |
|
} |
|
|
|
// Optimized sequential terminal operations for the head of the pipeline |
|
|
|
@Override |
|
public void forEach(DoubleConsumer consumer) { |
|
if (!isParallel()) { |
|
adapt(sourceStageSpliterator()).forEachRemaining(consumer); |
|
} |
|
else { |
|
super.forEach(consumer); |
|
} |
|
} |
|
|
|
@Override |
|
public void forEachOrdered(DoubleConsumer consumer) { |
|
if (!isParallel()) { |
|
adapt(sourceStageSpliterator()).forEachRemaining(consumer); |
|
} |
|
else { |
|
super.forEachOrdered(consumer); |
|
} |
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, |
|
StreamShape inputShape, |
|
int opFlags) { |
|
super(upstream, opFlags); |
|
assert upstream.getOutputShape() == inputShape; |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
return false; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, |
|
StreamShape inputShape, |
|
int opFlags) { |
|
super(upstream, opFlags); |
|
assert upstream.getOutputShape() == inputShape; |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
return true; |
|
} |
|
|
|
@Override |
|
abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Double[]> generator); |
|
} |
|
} |