|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
package java.util.stream; |
|
|
|
import java.util.IntSummaryStatistics; |
|
import java.util.Objects; |
|
import java.util.OptionalDouble; |
|
import java.util.OptionalInt; |
|
import java.util.PrimitiveIterator; |
|
import java.util.Spliterator; |
|
import java.util.Spliterators; |
|
import java.util.function.BiConsumer; |
|
import java.util.function.BinaryOperator; |
|
import java.util.function.IntBinaryOperator; |
|
import java.util.function.IntConsumer; |
|
import java.util.function.IntFunction; |
|
import java.util.function.IntPredicate; |
|
import java.util.function.IntToDoubleFunction; |
|
import java.util.function.IntToLongFunction; |
|
import java.util.function.IntUnaryOperator; |
|
import java.util.function.ObjIntConsumer; |
|
import java.util.function.Supplier; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract class IntPipeline<E_IN> |
|
extends AbstractPipeline<E_IN, Integer, IntStream> |
|
implements IntStream { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
IntPipeline(Supplier<? extends Spliterator<Integer>> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
IntPipeline(Spliterator<Integer> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { |
|
super(upstream, opFlags); |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
private static IntConsumer adapt(Sink<Integer> sink) { |
|
if (sink instanceof IntConsumer) { |
|
return (IntConsumer) sink; |
|
} |
|
else { |
|
if (Tripwire.ENABLED) |
|
Tripwire.trip(AbstractPipeline.class, |
|
"using IntStream.adapt(Sink<Integer> s)"); |
|
return sink::accept; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static Spliterator.OfInt adapt(Spliterator<Integer> s) { |
|
if (s instanceof Spliterator.OfInt) { |
|
return (Spliterator.OfInt) s; |
|
} |
|
else { |
|
if (Tripwire.ENABLED) |
|
Tripwire.trip(AbstractPipeline.class, |
|
"using IntStream.adapt(Spliterator<Integer> s)"); |
|
throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); |
|
} |
|
} |
|
|
|
|
|
// Shape-specific methods |
|
|
|
@Override |
|
final StreamShape getOutputShape() { |
|
return StreamShape.INT_VALUE; |
|
} |
|
|
|
@Override |
|
final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, |
|
Spliterator<P_IN> spliterator, |
|
boolean flattenTree, |
|
IntFunction<Integer[]> generator) { |
|
return Nodes.collectInt(helper, spliterator, flattenTree); |
|
} |
|
|
|
@Override |
|
final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, |
|
Supplier<Spliterator<P_IN>> supplier, |
|
boolean isParallel) { |
|
return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); |
|
} |
|
|
|
@Override |
|
@SuppressWarnings("unchecked") |
|
final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { |
|
return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); |
|
} |
|
|
|
@Override |
|
final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { |
|
Spliterator.OfInt spl = adapt(spliterator); |
|
IntConsumer adaptedSink = adapt(sink); |
|
do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); |
|
} |
|
|
|
@Override |
|
final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, |
|
IntFunction<Integer[]> generator) { |
|
return Nodes.intBuilder(exactSizeIfKnown); |
|
} |
|
|
|
|
|
// IntStream |
|
|
|
@Override |
|
public final PrimitiveIterator.OfInt iterator() { |
|
return Spliterators.iterator(spliterator()); |
|
} |
|
|
|
@Override |
|
public final Spliterator.OfInt spliterator() { |
|
return adapt(super.spliterator()); |
|
} |
|
|
|
// Stateless intermediate ops from IntStream |
|
|
|
@Override |
|
public final LongStream asLongStream() { |
|
return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedInt<Long>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
downstream.accept((long) t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream asDoubleStream() { |
|
return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedInt<Double>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
downstream.accept((double) t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final Stream<Integer> boxed() { |
|
return mapToObj(Integer::valueOf); |
|
} |
|
|
|
@Override |
|
public final IntStream map(IntUnaryOperator mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedInt<Integer>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
downstream.accept(mapper.applyAsInt(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<U> sink) { |
|
return new Sink.ChainedInt<U>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
downstream.accept(mapper.apply(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final LongStream mapToLong(IntToLongFunction mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedInt<Long>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
downstream.accept(mapper.applyAsLong(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedInt<Double>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
downstream.accept(mapper.applyAsDouble(t)); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { |
|
Objects.requireNonNull(mapper); |
|
return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedInt<Integer>(sink) { |
|
|
|
boolean cancellationRequestedCalled; |
|
|
|
|
|
IntConsumer downstreamAsInt = downstream::accept; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(int t) { |
|
try (IntStream result = mapper.apply(t)) { |
|
if (result != null) { |
|
if (!cancellationRequestedCalled) { |
|
result.sequential().forEach(downstreamAsInt); |
|
} |
|
else { |
|
Spliterator.OfInt s = result.sequential().spliterator(); |
|
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); |
|
} |
|
} |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
// If this method is called then an operation within the stream |
|
// pipeline is short-circuiting (see AbstractPipeline.copyInto). |
|
// Note that we cannot differentiate between an upstream or |
|
|
|
cancellationRequestedCalled = true; |
|
return downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public IntStream unordered() { |
|
if (!isOrdered()) |
|
return this; |
|
return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return sink; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final IntStream filter(IntPredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
StreamOpFlag.NOT_SIZED) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedInt<Integer>(sink) { |
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(int t) { |
|
if (predicate.test(t)) |
|
downstream.accept(t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
@Override |
|
public final IntStream peek(IntConsumer action) { |
|
Objects.requireNonNull(action); |
|
return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, |
|
0) { |
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedInt<Integer>(sink) { |
|
@Override |
|
public void accept(int t) { |
|
action.accept(t); |
|
downstream.accept(t); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
// Stateful intermediate ops from IntStream |
|
|
|
@Override |
|
public final IntStream limit(long maxSize) { |
|
if (maxSize < 0) |
|
throw new IllegalArgumentException(Long.toString(maxSize)); |
|
return SliceOps.makeInt(this, 0, maxSize); |
|
} |
|
|
|
@Override |
|
public final IntStream skip(long n) { |
|
if (n < 0) |
|
throw new IllegalArgumentException(Long.toString(n)); |
|
if (n == 0) |
|
return this; |
|
else |
|
return SliceOps.makeInt(this, n, -1); |
|
} |
|
|
|
@Override |
|
public final IntStream sorted() { |
|
return SortedOps.makeInt(this); |
|
} |
|
|
|
@Override |
|
public final IntStream distinct() { |
|
// While functional and quick to implement, this approach is not very efficient. |
|
|
|
return boxed().distinct().mapToInt(i -> i); |
|
} |
|
|
|
// Terminal ops from IntStream |
|
|
|
@Override |
|
public void forEach(IntConsumer action) { |
|
evaluate(ForEachOps.makeInt(action, false)); |
|
} |
|
|
|
@Override |
|
public void forEachOrdered(IntConsumer action) { |
|
evaluate(ForEachOps.makeInt(action, true)); |
|
} |
|
|
|
@Override |
|
public final int sum() { |
|
return reduce(0, Integer::sum); |
|
} |
|
|
|
@Override |
|
public final OptionalInt min() { |
|
return reduce(Math::min); |
|
} |
|
|
|
@Override |
|
public final OptionalInt max() { |
|
return reduce(Math::max); |
|
} |
|
|
|
@Override |
|
public final long count() { |
|
return mapToLong(e -> 1L).sum(); |
|
} |
|
|
|
@Override |
|
public final OptionalDouble average() { |
|
long[] avg = collect(() -> new long[2], |
|
(ll, i) -> { |
|
ll[0]++; |
|
ll[1] += i; |
|
}, |
|
(ll, rr) -> { |
|
ll[0] += rr[0]; |
|
ll[1] += rr[1]; |
|
}); |
|
return avg[0] > 0 |
|
? OptionalDouble.of((double) avg[1] / avg[0]) |
|
: OptionalDouble.empty(); |
|
} |
|
|
|
@Override |
|
public final IntSummaryStatistics summaryStatistics() { |
|
return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, |
|
IntSummaryStatistics::combine); |
|
} |
|
|
|
@Override |
|
public final int reduce(int identity, IntBinaryOperator op) { |
|
return evaluate(ReduceOps.makeInt(identity, op)); |
|
} |
|
|
|
@Override |
|
public final OptionalInt reduce(IntBinaryOperator op) { |
|
return evaluate(ReduceOps.makeInt(op)); |
|
} |
|
|
|
@Override |
|
public final <R> R collect(Supplier<R> supplier, |
|
ObjIntConsumer<R> accumulator, |
|
BiConsumer<R, R> combiner) { |
|
Objects.requireNonNull(combiner); |
|
BinaryOperator<R> operator = (left, right) -> { |
|
combiner.accept(left, right); |
|
return left; |
|
}; |
|
return evaluate(ReduceOps.makeInt(supplier, accumulator, operator)); |
|
} |
|
|
|
@Override |
|
public final boolean anyMatch(IntPredicate predicate) { |
|
return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); |
|
} |
|
|
|
@Override |
|
public final boolean allMatch(IntPredicate predicate) { |
|
return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); |
|
} |
|
|
|
@Override |
|
public final boolean noneMatch(IntPredicate predicate) { |
|
return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); |
|
} |
|
|
|
@Override |
|
public final OptionalInt findFirst() { |
|
return evaluate(FindOps.makeInt(true)); |
|
} |
|
|
|
@Override |
|
public final OptionalInt findAny() { |
|
return evaluate(FindOps.makeInt(false)); |
|
} |
|
|
|
@Override |
|
public final int[] toArray() { |
|
return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) |
|
.asPrimitiveArray(); |
|
} |
|
|
|
// |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static class Head<E_IN> extends IntPipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
Head(Supplier<? extends Spliterator<Integer>> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
Head(Spliterator<Integer> source, |
|
int sourceFlags, boolean parallel) { |
|
super(source, sourceFlags, parallel); |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
throw new UnsupportedOperationException(); |
|
} |
|
|
|
@Override |
|
final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { |
|
throw new UnsupportedOperationException(); |
|
} |
|
|
|
// Optimized sequential terminal operations for the head of the pipeline |
|
|
|
@Override |
|
public void forEach(IntConsumer action) { |
|
if (!isParallel()) { |
|
adapt(sourceStageSpliterator()).forEachRemaining(action); |
|
} |
|
else { |
|
super.forEach(action); |
|
} |
|
} |
|
|
|
@Override |
|
public void forEachOrdered(IntConsumer action) { |
|
if (!isParallel()) { |
|
adapt(sourceStageSpliterator()).forEachRemaining(action); |
|
} |
|
else { |
|
super.forEachOrdered(action); |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, |
|
StreamShape inputShape, |
|
int opFlags) { |
|
super(upstream, opFlags); |
|
assert upstream.getOutputShape() == inputShape; |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
return false; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, |
|
StreamShape inputShape, |
|
int opFlags) { |
|
super(upstream, opFlags); |
|
assert upstream.getOutputShape() == inputShape; |
|
} |
|
|
|
@Override |
|
final boolean opIsStateful() { |
|
return true; |
|
} |
|
|
|
@Override |
|
abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Integer[]> generator); |
|
} |
|
} |