|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
package java.util.stream; |
|
|
|
import java.util.LongSummaryStatistics; |
|
import java.util.Objects; |
|
import java.util.OptionalDouble; |
|
import java.util.OptionalLong; |
|
import java.util.PrimitiveIterator; |
|
import java.util.Spliterator; |
|
import java.util.Spliterators; |
|
import java.util.function.BiConsumer; |
|
import java.util.function.BinaryOperator; |
|
import java.util.function.IntFunction; |
|
import java.util.function.LongBinaryOperator; |
|
import java.util.function.LongConsumer; |
|
import java.util.function.LongFunction; |
|
import java.util.function.LongPredicate; |
|
import java.util.function.LongToDoubleFunction; |
|
import java.util.function.LongToIntFunction; |
|
import java.util.function.LongUnaryOperator; |
|
import java.util.function.ObjLongConsumer; |
|
import java.util.function.Supplier; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract class LongPipeline<E_IN> |
|
extends AbstractPipeline<E_IN, Long, LongStream> |
|
implements LongStream { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
LongPipeline(Supplier<? extends Spliterator<Long>> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
LongPipeline(Spliterator<Long> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { |
|
super(upstream, opFlags); |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
private static LongConsumer adapt(Sink<Long> sink) { |
|
if (sink instanceof LongConsumer) { |
|
return (LongConsumer) sink; |
|
} else { |
|
if (Tripwire.ENABLED) |
|
Tripwire.trip(AbstractPipeline.class, |
|
"using LongStream.adapt(Sink<Long> s)"); |
|
return sink::accept; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static Spliterator.OfLong adapt(Spliterator<Long> s) { |
|
if (s instanceof Spliterator.OfLong) { |
|
return (Spliterator.OfLong) s; |
|
} else { |
|
if (Tripwire.ENABLED) |
|
Tripwire.trip(AbstractPipeline.class, |
|
"using LongStream.adapt(Spliterator<Long> s)"); |
|
throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)"); |
|
} |
|
} |
|
|
|
|
|
// Shape-specific methods |
|
|
|
@Override |
|
final StreamShape getOutputShape() { |
|
return StreamShape.LONG_VALUE; |
|
} |
|
|
|
@Override |
|
final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper, |
|
Spliterator<P_IN> spliterator, |
|
boolean flattenTree, |
|
IntFunction<Long[]> generator) { |
|
return Nodes.collectLong(helper, spliterator, flattenTree); |
|
} |
|
|
|
@Override |
|
final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph, |
|
Supplier<Spliterator<P_IN>> supplier, |
|
boolean isParallel) { |
|
return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel); |
|
} |
|
|
|
@Override |
|
@SuppressWarnings("unchecked") |
|
final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) { |
|
return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier); |
|
} |
|
|
|
@Override |
|
final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) { |
|
Spliterator.OfLong spl = adapt(spliterator); |
|
LongConsumer adaptedSink = adapt(sink); |
|
boolean cancelled; |
|
do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); |
|
return cancelled; |
|
} |
|
|
|
@Override |
|
final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) { |
|
return Nodes.longBuilder(exactSizeIfKnown); |
|
} |
|
|
|
private <U> Stream<U> mapToObj(LongFunction<? extends U> mapper, int opFlags) { |
|
return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE, opFlags) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<U> sink) { |
|
return new Sink.ChainedLong<U>(sink) { |
|
@Override |
|
public void accept(long t) { |
|
downstream.accept(mapper.apply(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
// LongStream |
|
|
|
@Override |
|
public final PrimitiveIterator.OfLong iterator() { |
|
return Spliterators.iterator(spliterator()); |
|
} |
|
|
|
@Override |
|
public final Spliterator.OfLong spliterator() { |
|
return adapt(super.spliterator()); |
|
} |
|
|
|
// Stateless intermediate ops from LongStream |
|
|
|
@Override |
|
public final DoubleStream asDoubleStream() { |
|
return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedLong<Double>(sink) { |
|
@Override |
|
public void accept(long t) { |
|
downstream.accept((double) t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final Stream<Long> boxed() { |
|
return mapToObj(Long::valueOf, 0); |
|
} |
|
|
|
@Override |
|
public final LongStream map(LongUnaryOperator mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedLong<Long>(sink) { |
|
@Override |
|
public void accept(long t) { |
|
downstream.accept(mapper.applyAsLong(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) { |
|
Objects.requireNonNull(mapper); |
|
return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT); |
|
} |
|
|
|
@Override |
|
public final IntStream mapToInt(LongToIntFunction mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedLong<Integer>(sink) { |
|
@Override |
|
public void accept(long t) { |
|
downstream.accept(mapper.applyAsInt(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream mapToDouble(LongToDoubleFunction mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedLong<Double>(sink) { |
|
@Override |
|
public void accept(long t) { |
|
downstream.accept(mapper.applyAsDouble(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final LongStream flatMap(LongFunction<? extends LongStream> mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedLong<Long>(sink) { |
|
|
|
boolean cancellationRequestedCalled; |
|
|
|
|
|
LongConsumer downstreamAsLong = downstream::accept; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(long t) { |
|
try (LongStream result = mapper.apply(t)) { |
|
if (result != null) { |
|
if (!cancellationRequestedCalled) { |
|
result.sequential().forEach(downstreamAsLong); |
|
} |
|
else { |
|
var s = result.sequential().spliterator(); |
|
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); |
|
} |
|
} |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
// If this method is called then an operation within the stream |
|
// pipeline is short-circuiting (see AbstractPipeline.copyInto). |
|
// Note that we cannot differentiate between an upstream or |
|
|
|
cancellationRequestedCalled = true; |
|
return downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public LongStream unordered() { |
|
if (!isOrdered()) |
|
return this; |
|
return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return sink; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final LongStream filter(LongPredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, |
|
StreamOpFlag.NOT_SIZED) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedLong<Long>(sink) { |
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(long t) { |
|
if (predicate.test(t)) |
|
downstream.accept(t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final LongStream peek(LongConsumer action) { |
|
Objects.requireNonNull(action); |
|
return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, |
|
0) { |
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedLong<Long>(sink) { |
|
@Override |
|
public void accept(long t) { |
|
action.accept(t); |
|
downstream.accept(t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
// Stateful intermediate ops from LongStream |
|
|
|
@Override |
|
public final LongStream limit(long maxSize) { |
|
if (maxSize < 0) |
|
throw new IllegalArgumentException(Long.toString(maxSize)); |
|
return SliceOps.makeLong(this, 0, maxSize); |
|
} |
|
|
|
@Override |
|
public final LongStream skip(long n) { |
|
if (n < 0) |
|
throw new IllegalArgumentException(Long.toString(n)); |
|
if (n == 0) |
|
return this; |
|
else |
|
return SliceOps.makeLong(this, n, -1); |
|
} |
|
|
|
@Override |
|
public final LongStream takeWhile(LongPredicate predicate) { |
|
return WhileOps.makeTakeWhileLong(this, predicate); |
|
} |
|
|
|
@Override |
|
public final LongStream dropWhile(LongPredicate predicate) { |
|
return WhileOps.makeDropWhileLong(this, predicate); |
|
} |
|
|
|
@Override |
|
public final LongStream sorted() { |
|
return SortedOps.makeLong(this); |
|
} |
|
|
|
@Override |
|
public final LongStream distinct() { |
|
// While functional and quick to implement, this approach is not very efficient. |
|
|
|
return boxed().distinct().mapToLong(i -> (long) i); |
|
} |
|
|
|
// Terminal ops from LongStream |
|
|
|
@Override |
|
public void forEach(LongConsumer action) { |
|
evaluate(ForEachOps.makeLong(action, false)); |
|
} |
|
|
|
@Override |
|
public void forEachOrdered(LongConsumer action) { |
|
evaluate(ForEachOps.makeLong(action, true)); |
|
} |
|
|
|
@Override |
|
public final long sum() { |
|
|
|
return reduce(0, Long::sum); |
|
} |
|
|
|
@Override |
|
public final OptionalLong min() { |
|
return reduce(Math::min); |
|
} |
|
|
|
@Override |
|
public final OptionalLong max() { |
|
return reduce(Math::max); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble average() { |
|
long[] avg = collect(() -> new long[2], |
|
(ll, i) -> { |
|
ll[0]++; |
|
ll[1] += i; |
|
}, |
|
(ll, rr) -> { |
|
ll[0] += rr[0]; |
|
ll[1] += rr[1]; |
|
}); |
|
return avg[0] > 0 |
|
? OptionalDouble.of((double) avg[1] / avg[0]) |
|
: OptionalDouble.empty(); |
|
} |
|
|
|
@Override |
|
public final long count() { |
|
return evaluate(ReduceOps.makeLongCounting()); |
|
} |
|
|
|
@Override |
|
public final LongSummaryStatistics summaryStatistics() { |
|
return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept, |
|
LongSummaryStatistics::combine); |
|
} |
|
|
|
@Override |
|
public final long reduce(long identity, LongBinaryOperator op) { |
|
return evaluate(ReduceOps.makeLong(identity, op)); |
|
} |
|
|
|
@Override |
|
public final OptionalLong reduce(LongBinaryOperator op) { |
|
return evaluate(ReduceOps.makeLong(op)); |
|
} |
|
|
|
@Override |
|
public final <R> R collect(Supplier<R> supplier, |
|
ObjLongConsumer<R> accumulator, |
|
BiConsumer<R, R> combiner) { |
|
Objects.requireNonNull(combiner); |
|
BinaryOperator<R> operator = (left, right) -> { |
|
combiner.accept(left, right); |
|
return left; |
|
}; |
|
return evaluate(ReduceOps.makeLong(supplier, accumulator, operator)); |
|
} |
|
|
|
@Override |
|
public final boolean anyMatch(LongPredicate predicate) { |
|
return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY)); |
|
} |
|
|
|
@Override |
|
public final boolean allMatch(LongPredicate predicate) { |
|
return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL)); |
|
} |
|
|
|
@Override |
|
public final boolean noneMatch(LongPredicate predicate) { |
|
return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE)); |
|
} |
|
|
|
@Override |
|
public final OptionalLong findFirst() { |
|
return evaluate(FindOps.makeLong(true)); |
|
} |
|
|
|
@Override |
|
public final OptionalLong findAny() { |
|
return evaluate(FindOps.makeLong(false)); |
|
} |
|
|
|
@Override |
|
public final long[] toArray() { |
|
return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new)) |
|
.asPrimitiveArray(); |
|
} |
|
|
|
|
|
// |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static class Head<E_IN> extends LongPipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
Head(Supplier<? extends Spliterator<Long>> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
Head(Spliterator<Long> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
throw new UnsupportedOperationException(); |
|
} |
|
|
|
@Override |
|
final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) { |
|
throw new UnsupportedOperationException(); |
|
} |
|
|
|
// Optimized sequential terminal operations for the head of the pipeline |
|
|
|
@Override |
|
public void forEach(LongConsumer action) { |
|
if (!isParallel()) { |
|
adapt(sourceStageSpliterator()).forEachRemaining(action); |
|
} else { |
|
super.forEach(action); |
|
} |
|
} |
|
|
|
@Override |
|
public void forEachOrdered(LongConsumer action) { |
|
if (!isParallel()) { |
|
adapt(sourceStageSpliterator()).forEachRemaining(action); |
|
} else { |
|
super.forEachOrdered(action); |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, |
|
StreamShape inputShape, |
|
int opFlags) { |
|
super(upstream, opFlags); |
|
assert upstream.getOutputShape() == inputShape; |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
return false; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, |
|
StreamShape inputShape, |
|
int opFlags) { |
|
super(upstream, opFlags); |
|
assert upstream.getOutputShape() == inputShape; |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
return true; |
|
} |
|
|
|
@Override |
|
abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Long[]> generator); |
|
} |
|
} |