|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
package java.util.stream; |
|
|
|
import java.util.Comparator; |
|
import java.util.Objects; |
|
import java.util.Spliterator; |
|
import java.util.concurrent.CountedCompleter; |
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
import java.util.function.Consumer; |
|
import java.util.function.DoubleConsumer; |
|
import java.util.function.DoublePredicate; |
|
import java.util.function.IntConsumer; |
|
import java.util.function.IntFunction; |
|
import java.util.function.IntPredicate; |
|
import java.util.function.LongConsumer; |
|
import java.util.function.LongPredicate; |
|
import java.util.function.Predicate; |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
final class WhileOps { |
|
|
|
static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT; |
|
|
|
static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream, |
|
Predicate<? super T> predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) { |
|
@Override |
|
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfRef.Taking<>( |
|
helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<T[]> generator) { |
|
return new TakeWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<T> opWrapSink(int flags, Sink<T> sink) { |
|
return new Sink.ChainedReference<T, T>(sink) { |
|
boolean take = true; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(T t) { |
|
if (take && (take = predicate.test(t))) { |
|
downstream.accept(t); |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
return !take || downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream, |
|
IntPredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) { |
|
@Override |
|
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Integer[]::new) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfInt.Taking( |
|
(Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Integer[]> generator) { |
|
return new TakeWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return new Sink.ChainedInt<Integer>(sink) { |
|
boolean take = true; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(int t) { |
|
if (take && (take = predicate.test(t))) { |
|
downstream.accept(t); |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
return !take || downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream, |
|
LongPredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) { |
|
@Override |
|
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Long[]::new) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfLong.Taking( |
|
(Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Long[]> generator) { |
|
return new TakeWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return new Sink.ChainedLong<Long>(sink) { |
|
boolean take = true; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(long t) { |
|
if (take && (take = predicate.test(t))) { |
|
downstream.accept(t); |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
return !take || downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream, |
|
DoublePredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) { |
|
@Override |
|
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Double[]::new) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfDouble.Taking( |
|
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Double[]> generator) { |
|
return new TakeWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return new Sink.ChainedDouble<Double>(sink) { |
|
boolean take = true; |
|
|
|
@Override |
|
public void begin(long size) { |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void accept(double t) { |
|
if (take && (take = predicate.test(t))) { |
|
downstream.accept(t); |
|
} |
|
} |
|
|
|
@Override |
|
public boolean cancellationRequested() { |
|
return !take || downstream.cancellationRequested(); |
|
} |
|
}; |
|
} |
|
}; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
interface DropWhileOp<T> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
interface DropWhileSink<T> extends Sink<T> { |
|
|
|
|
|
|
|
*/ |
|
long getDropCount(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream, |
|
Predicate<? super T> predicate) { |
|
Objects.requireNonNull(predicate); |
|
|
|
class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> { |
|
public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) { |
|
super(upstream, inputShape, opFlags); |
|
} |
|
|
|
@Override |
|
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfRef.Dropping<>( |
|
helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<T[]> generator) { |
|
return new DropWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<T> opWrapSink(int flags, Sink<T> sink) { |
|
return opWrapSink(sink, false); |
|
} |
|
|
|
public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) { |
|
class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> { |
|
long dropCount; |
|
boolean take; |
|
|
|
OpSink() { |
|
super(sink); |
|
} |
|
|
|
@Override |
|
public void accept(T t) { |
|
boolean takeElement = take || (take = !predicate.test(t)); |
|
|
|
// If ordered and element is dropped increment index |
|
|
|
if (retainAndCountDroppedElements && !takeElement) |
|
dropCount++; |
|
|
|
// If ordered need to process element, otherwise |
|
|
|
if (retainAndCountDroppedElements || takeElement) |
|
downstream.accept(t); |
|
} |
|
|
|
@Override |
|
public long getDropCount() { |
|
return dropCount; |
|
} |
|
} |
|
return new OpSink(); |
|
} |
|
} |
|
return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream, |
|
IntPredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> { |
|
public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) { |
|
super(upstream, inputShape, opFlags); |
|
} |
|
|
|
@Override |
|
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Integer[]::new) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfInt.Dropping( |
|
(Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Integer[]> generator) { |
|
return new DropWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { |
|
return opWrapSink(sink, false); |
|
} |
|
|
|
public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) { |
|
class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> { |
|
long dropCount; |
|
boolean take; |
|
|
|
OpSink() { |
|
super(sink); |
|
} |
|
|
|
@Override |
|
public void accept(int t) { |
|
boolean takeElement = take || (take = !predicate.test(t)); |
|
|
|
// If ordered and element is dropped increment index |
|
|
|
if (retainAndCountDroppedElements && !takeElement) |
|
dropCount++; |
|
|
|
// If ordered need to process element, otherwise |
|
|
|
if (retainAndCountDroppedElements || takeElement) |
|
downstream.accept(t); |
|
} |
|
|
|
@Override |
|
public long getDropCount() { |
|
return dropCount; |
|
} |
|
} |
|
return new OpSink(); |
|
} |
|
} |
|
return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream, |
|
LongPredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> { |
|
public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) { |
|
super(upstream, inputShape, opFlags); |
|
} |
|
|
|
@Override |
|
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Long[]::new) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfLong.Dropping( |
|
(Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Long[]> generator) { |
|
return new DropWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<Long> opWrapSink(int flags, Sink<Long> sink) { |
|
return opWrapSink(sink, false); |
|
} |
|
|
|
public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) { |
|
class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> { |
|
long dropCount; |
|
boolean take; |
|
|
|
OpSink() { |
|
super(sink); |
|
} |
|
|
|
@Override |
|
public void accept(long t) { |
|
boolean takeElement = take || (take = !predicate.test(t)); |
|
|
|
// If ordered and element is dropped increment index |
|
|
|
if (retainAndCountDroppedElements && !takeElement) |
|
dropCount++; |
|
|
|
// If ordered need to process element, otherwise |
|
|
|
if (retainAndCountDroppedElements || takeElement) |
|
downstream.accept(t); |
|
} |
|
|
|
@Override |
|
public long getDropCount() { |
|
return dropCount; |
|
} |
|
} |
|
return new OpSink(); |
|
} |
|
} |
|
return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream, |
|
DoublePredicate predicate) { |
|
Objects.requireNonNull(predicate); |
|
class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> { |
|
public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) { |
|
super(upstream, inputShape, opFlags); |
|
} |
|
|
|
@Override |
|
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, |
|
Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return opEvaluateParallel(helper, spliterator, Double[]::new) |
|
.spliterator(); |
|
} |
|
else { |
|
return new UnorderedWhileSpliterator.OfDouble.Dropping( |
|
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<Double[]> generator) { |
|
return new DropWhileTask<>(this, helper, spliterator, generator) |
|
.invoke(); |
|
} |
|
|
|
@Override |
|
Sink<Double> opWrapSink(int flags, Sink<Double> sink) { |
|
return opWrapSink(sink, false); |
|
} |
|
|
|
public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) { |
|
class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> { |
|
long dropCount; |
|
boolean take; |
|
|
|
OpSink() { |
|
super(sink); |
|
} |
|
|
|
@Override |
|
public void accept(double t) { |
|
boolean takeElement = take || (take = !predicate.test(t)); |
|
|
|
// If ordered and element is dropped increment index |
|
|
|
if (retainAndCountDroppedElements && !takeElement) |
|
dropCount++; |
|
|
|
// If ordered need to process element, otherwise |
|
|
|
if (retainAndCountDroppedElements || takeElement) |
|
downstream.accept(t); |
|
} |
|
|
|
@Override |
|
public long getDropCount() { |
|
return dropCount; |
|
} |
|
} |
|
return new OpSink(); |
|
} |
|
} |
|
return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS); |
|
} |
|
|
|
// |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> { |
|
|
|
static final int CANCEL_CHECK_COUNT = (1 << 6) - 1; |
|
|
|
|
|
final T_SPLITR s; |
|
// True if no splitting should be performed, if true then |
|
// this spliterator may be used for an underlying spliterator whose |
|
// covered elements have an encounter order |
|
|
|
final boolean noSplitting; |
|
// True when operations are cancelled for all related spliterators |
|
// For taking, spliterators cannot split or traversed |
|
|
|
final AtomicBoolean cancel; |
|
|
|
boolean takeOrDrop = true; |
|
|
|
int count; |
|
|
|
UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) { |
|
this.s = s; |
|
this.noSplitting = noSplitting; |
|
this.cancel = new AtomicBoolean(); |
|
} |
|
|
|
UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) { |
|
this.s = s; |
|
this.noSplitting = parent.noSplitting; |
|
this.cancel = parent.cancel; |
|
} |
|
|
|
@Override |
|
public long estimateSize() { |
|
return s.estimateSize(); |
|
} |
|
|
|
@Override |
|
public int characteristics() { |
|
|
|
return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); |
|
} |
|
|
|
@Override |
|
public long getExactSizeIfKnown() { |
|
return -1L; |
|
} |
|
|
|
@Override |
|
public Comparator<? super T> getComparator() { |
|
return s.getComparator(); |
|
} |
|
|
|
@Override |
|
public T_SPLITR trySplit() { |
|
@SuppressWarnings("unchecked") |
|
T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit(); |
|
return ls != null ? makeSpliterator(ls) : null; |
|
} |
|
|
|
boolean checkCancelOnCount() { |
|
return count != 0 || !cancel.get(); |
|
} |
|
|
|
abstract T_SPLITR makeSpliterator(T_SPLITR s); |
|
|
|
abstract static class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> { |
|
final Predicate<? super T> p; |
|
T t; |
|
|
|
OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { |
|
super(s, noSplitting); |
|
this.p = p; |
|
} |
|
|
|
OfRef(Spliterator<T> s, OfRef<T> parent) { |
|
super(s, parent); |
|
this.p = parent.p; |
|
} |
|
|
|
@Override |
|
public void accept(T t) { |
|
count = (count + 1) & CANCEL_CHECK_COUNT; |
|
this.t = t; |
|
} |
|
|
|
static final class Taking<T> extends OfRef<T> { |
|
Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Taking(Spliterator<T> s, Taking<T> parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(Consumer<? super T> action) { |
|
boolean test = true; |
|
if (takeOrDrop && |
|
checkCancelOnCount() && |
|
s.tryAdvance(this) && |
|
(test = p.test(t))) { // and test on element passes |
|
action.accept(t); |
|
return true; |
|
} |
|
else { |
|
|
|
takeOrDrop = false; |
|
// Cancel all further traversal and splitting operations |
|
|
|
if (!test) |
|
cancel.set(true); |
|
return false; |
|
} |
|
} |
|
|
|
@Override |
|
public Spliterator<T> trySplit() { |
|
|
|
return cancel.get() ? null : super.trySplit(); |
|
} |
|
|
|
@Override |
|
Spliterator<T> makeSpliterator(Spliterator<T> s) { |
|
return new Taking<>(s, this); |
|
} |
|
} |
|
|
|
static final class Dropping<T> extends OfRef<T> { |
|
Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Dropping(Spliterator<T> s, Dropping<T> parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(Consumer<? super T> action) { |
|
if (takeOrDrop) { |
|
takeOrDrop = false; |
|
boolean adv; |
|
boolean dropped = false; |
|
while ((adv = s.tryAdvance(this)) && |
|
checkCancelOnCount() && |
|
p.test(t)) { // and test on element passes |
|
dropped = true; |
|
} |
|
|
|
|
|
if (adv) { |
|
// Cancel all further dropping if one or more elements |
|
|
|
if (dropped) |
|
cancel.set(true); |
|
action.accept(t); |
|
} |
|
return adv; |
|
} |
|
else { |
|
return s.tryAdvance(action); |
|
} |
|
} |
|
|
|
@Override |
|
Spliterator<T> makeSpliterator(Spliterator<T> s) { |
|
return new Dropping<>(s, this); |
|
} |
|
} |
|
} |
|
|
|
abstract static class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt { |
|
final IntPredicate p; |
|
int t; |
|
|
|
OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { |
|
super(s, noSplitting); |
|
this.p = p; |
|
} |
|
|
|
OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { |
|
super(s, parent); |
|
this.p = parent.p; |
|
} |
|
|
|
@Override |
|
public void accept(int t) { |
|
count = (count + 1) & CANCEL_CHECK_COUNT; |
|
this.t = t; |
|
} |
|
|
|
static final class Taking extends UnorderedWhileSpliterator.OfInt { |
|
Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(IntConsumer action) { |
|
boolean test = true; |
|
if (takeOrDrop && |
|
checkCancelOnCount() && |
|
s.tryAdvance(this) && |
|
(test = p.test(t))) { // and test on element passes |
|
action.accept(t); |
|
return true; |
|
} |
|
else { |
|
|
|
takeOrDrop = false; |
|
// Cancel all further traversal and splitting operations |
|
|
|
if (!test) |
|
cancel.set(true); |
|
return false; |
|
} |
|
} |
|
|
|
@Override |
|
public Spliterator.OfInt trySplit() { |
|
|
|
return cancel.get() ? null : super.trySplit(); |
|
} |
|
|
|
@Override |
|
Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { |
|
return new Taking(s, this); |
|
} |
|
} |
|
|
|
static final class Dropping extends UnorderedWhileSpliterator.OfInt { |
|
Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(IntConsumer action) { |
|
if (takeOrDrop) { |
|
takeOrDrop = false; |
|
boolean adv; |
|
boolean dropped = false; |
|
while ((adv = s.tryAdvance(this)) && |
|
checkCancelOnCount() && |
|
p.test(t)) { // and test on element passes |
|
dropped = true; |
|
} |
|
|
|
|
|
if (adv) { |
|
// Cancel all further dropping if one or more elements |
|
|
|
if (dropped) |
|
cancel.set(true); |
|
action.accept(t); |
|
} |
|
return adv; |
|
} |
|
else { |
|
return s.tryAdvance(action); |
|
} |
|
} |
|
|
|
@Override |
|
Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { |
|
return new Dropping(s, this); |
|
} |
|
} |
|
} |
|
|
|
abstract static class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong { |
|
final LongPredicate p; |
|
long t; |
|
|
|
OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { |
|
super(s, noSplitting); |
|
this.p = p; |
|
} |
|
|
|
OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { |
|
super(s, parent); |
|
this.p = parent.p; |
|
} |
|
|
|
@Override |
|
public void accept(long t) { |
|
count = (count + 1) & CANCEL_CHECK_COUNT; |
|
this.t = t; |
|
} |
|
|
|
static final class Taking extends UnorderedWhileSpliterator.OfLong { |
|
Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(LongConsumer action) { |
|
boolean test = true; |
|
if (takeOrDrop && |
|
checkCancelOnCount() && |
|
s.tryAdvance(this) && |
|
(test = p.test(t))) { // and test on element passes |
|
action.accept(t); |
|
return true; |
|
} |
|
else { |
|
|
|
takeOrDrop = false; |
|
// Cancel all further traversal and splitting operations |
|
|
|
if (!test) |
|
cancel.set(true); |
|
return false; |
|
} |
|
} |
|
|
|
@Override |
|
public Spliterator.OfLong trySplit() { |
|
|
|
return cancel.get() ? null : super.trySplit(); |
|
} |
|
|
|
@Override |
|
Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { |
|
return new Taking(s, this); |
|
} |
|
} |
|
|
|
static final class Dropping extends UnorderedWhileSpliterator.OfLong { |
|
Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(LongConsumer action) { |
|
if (takeOrDrop) { |
|
takeOrDrop = false; |
|
boolean adv; |
|
boolean dropped = false; |
|
while ((adv = s.tryAdvance(this)) && |
|
checkCancelOnCount() && |
|
p.test(t)) { // and test on element passes |
|
dropped = true; |
|
} |
|
|
|
|
|
if (adv) { |
|
// Cancel all further dropping if one or more elements |
|
|
|
if (dropped) |
|
cancel.set(true); |
|
action.accept(t); |
|
} |
|
return adv; |
|
} |
|
else { |
|
return s.tryAdvance(action); |
|
} |
|
} |
|
|
|
@Override |
|
Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { |
|
return new Dropping(s, this); |
|
} |
|
} |
|
} |
|
|
|
abstract static class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble { |
|
final DoublePredicate p; |
|
double t; |
|
|
|
OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { |
|
super(s, noSplitting); |
|
this.p = p; |
|
} |
|
|
|
OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { |
|
super(s, parent); |
|
this.p = parent.p; |
|
} |
|
|
|
@Override |
|
public void accept(double t) { |
|
count = (count + 1) & CANCEL_CHECK_COUNT; |
|
this.t = t; |
|
} |
|
|
|
static final class Taking extends UnorderedWhileSpliterator.OfDouble { |
|
Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(DoubleConsumer action) { |
|
boolean test = true; |
|
if (takeOrDrop && |
|
checkCancelOnCount() && |
|
s.tryAdvance(this) && |
|
(test = p.test(t))) { // and test on element passes |
|
action.accept(t); |
|
return true; |
|
} |
|
else { |
|
|
|
takeOrDrop = false; |
|
// Cancel all further traversal and splitting operations |
|
|
|
if (!test) |
|
cancel.set(true); |
|
return false; |
|
} |
|
} |
|
|
|
@Override |
|
public Spliterator.OfDouble trySplit() { |
|
|
|
return cancel.get() ? null : super.trySplit(); |
|
} |
|
|
|
@Override |
|
Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { |
|
return new Taking(s, this); |
|
} |
|
} |
|
|
|
static final class Dropping extends UnorderedWhileSpliterator.OfDouble { |
|
Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { |
|
super(s, noSplitting, p); |
|
} |
|
|
|
Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { |
|
super(s, parent); |
|
} |
|
|
|
@Override |
|
public boolean tryAdvance(DoubleConsumer action) { |
|
if (takeOrDrop) { |
|
takeOrDrop = false; |
|
boolean adv; |
|
boolean dropped = false; |
|
while ((adv = s.tryAdvance(this)) && |
|
checkCancelOnCount() && |
|
p.test(t)) { // and test on element passes |
|
dropped = true; |
|
} |
|
|
|
|
|
if (adv) { |
|
// Cancel all further dropping if one or more elements |
|
|
|
if (dropped) |
|
cancel.set(true); |
|
action.accept(t); |
|
} |
|
return adv; |
|
} |
|
else { |
|
return s.tryAdvance(action); |
|
} |
|
} |
|
|
|
@Override |
|
Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { |
|
return new Dropping(s, this); |
|
} |
|
} |
|
} |
|
} |
|
|
|
|
|
// |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("serial") |
|
private static final class TakeWhileTask<P_IN, P_OUT> |
|
extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> { |
|
private final AbstractPipeline<P_OUT, P_OUT, ?> op; |
|
private final IntFunction<P_OUT[]> generator; |
|
private final boolean isOrdered; |
|
private long thisNodeSize; |
|
|
|
private boolean shortCircuited; |
|
|
|
private volatile boolean completed; |
|
|
|
TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, |
|
PipelineHelper<P_OUT> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<P_OUT[]> generator) { |
|
super(helper, spliterator); |
|
this.op = op; |
|
this.generator = generator; |
|
this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); |
|
} |
|
|
|
TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { |
|
super(parent, spliterator); |
|
this.op = parent.op; |
|
this.generator = parent.generator; |
|
this.isOrdered = parent.isOrdered; |
|
} |
|
|
|
@Override |
|
protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { |
|
return new TakeWhileTask<>(this, spliterator); |
|
} |
|
|
|
@Override |
|
protected final Node<P_OUT> getEmptyResult() { |
|
return Nodes.emptyNode(op.getOutputShape()); |
|
} |
|
|
|
@Override |
|
protected final Node<P_OUT> doLeaf() { |
|
Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator); |
|
Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder); |
|
|
|
if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) { |
|
// Cancel later nodes if the predicate returned false |
|
|
|
cancelLaterNodes(); |
|
} |
|
|
|
Node<P_OUT> node = builder.build(); |
|
thisNodeSize = node.count(); |
|
return node; |
|
} |
|
|
|
@Override |
|
public final void onCompletion(CountedCompleter<?> caller) { |
|
if (!isLeaf()) { |
|
Node<P_OUT> result; |
|
shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited; |
|
if (isOrdered && canceled) { |
|
thisNodeSize = 0; |
|
result = getEmptyResult(); |
|
} |
|
else if (isOrdered && leftChild.shortCircuited) { |
|
// If taking finished on the left node then |
|
|
|
thisNodeSize = leftChild.thisNodeSize; |
|
result = leftChild.getLocalResult(); |
|
} |
|
else { |
|
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; |
|
result = merge(); |
|
} |
|
|
|
setLocalResult(result); |
|
} |
|
|
|
completed = true; |
|
super.onCompletion(caller); |
|
} |
|
|
|
Node<P_OUT> merge() { |
|
if (leftChild.thisNodeSize == 0) { |
|
// If the left node size is 0 then |
|
|
|
return rightChild.getLocalResult(); |
|
} |
|
else if (rightChild.thisNodeSize == 0) { |
|
// If the right node size is 0 then |
|
|
|
return leftChild.getLocalResult(); |
|
} |
|
else { |
|
|
|
return Nodes.conc(op.getOutputShape(), |
|
leftChild.getLocalResult(), rightChild.getLocalResult()); |
|
} |
|
} |
|
|
|
@Override |
|
protected void cancel() { |
|
super.cancel(); |
|
if (isOrdered && completed) |
|
// If the task is completed then clear the result, if any |
|
|
|
setLocalResult(getEmptyResult()); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("serial") |
|
private static final class DropWhileTask<P_IN, P_OUT> |
|
extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> { |
|
private final AbstractPipeline<P_OUT, P_OUT, ?> op; |
|
private final IntFunction<P_OUT[]> generator; |
|
private final boolean isOrdered; |
|
private long thisNodeSize; |
|
// The index from which elements of the node should be taken |
|
// i.e. the node should be truncated from [takeIndex, thisNodeSize) |
|
|
|
private long index; |
|
|
|
DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, |
|
PipelineHelper<P_OUT> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<P_OUT[]> generator) { |
|
super(helper, spliterator); |
|
assert op instanceof DropWhileOp; |
|
this.op = op; |
|
this.generator = generator; |
|
this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); |
|
} |
|
|
|
DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { |
|
super(parent, spliterator); |
|
this.op = parent.op; |
|
this.generator = parent.generator; |
|
this.isOrdered = parent.isOrdered; |
|
} |
|
|
|
@Override |
|
protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { |
|
return new DropWhileTask<>(this, spliterator); |
|
} |
|
|
|
@Override |
|
protected final Node<P_OUT> doLeaf() { |
|
boolean isChild = !isRoot(); |
|
// If this not the root and pipeline is ordered and size is known |
|
|
|
long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) |
|
? op.exactOutputSizeIfKnown(spliterator) |
|
: -1; |
|
Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator); |
|
@SuppressWarnings("unchecked") |
|
DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op; |
|
// If this leaf is the root then there is no merging on completion |
|
|
|
DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild); |
|
helper.wrapAndCopyInto(s, spliterator); |
|
|
|
Node<P_OUT> node = builder.build(); |
|
thisNodeSize = node.count(); |
|
index = s.getDropCount(); |
|
return node; |
|
} |
|
|
|
@Override |
|
public final void onCompletion(CountedCompleter<?> caller) { |
|
if (!isLeaf()) { |
|
if (isOrdered) { |
|
index = leftChild.index; |
|
// If a contiguous sequence of dropped elements |
|
|
|
if (index == leftChild.thisNodeSize) |
|
index += rightChild.index; |
|
} |
|
|
|
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; |
|
Node<P_OUT> result = merge(); |
|
setLocalResult(isRoot() ? doTruncate(result) : result); |
|
} |
|
|
|
super.onCompletion(caller); |
|
} |
|
|
|
private Node<P_OUT> merge() { |
|
if (leftChild.thisNodeSize == 0) { |
|
// If the left node size is 0 then |
|
|
|
return rightChild.getLocalResult(); |
|
} |
|
else if (rightChild.thisNodeSize == 0) { |
|
// If the right node size is 0 then |
|
|
|
return leftChild.getLocalResult(); |
|
} |
|
else { |
|
|
|
return Nodes.conc(op.getOutputShape(), |
|
leftChild.getLocalResult(), rightChild.getLocalResult()); |
|
} |
|
} |
|
|
|
private Node<P_OUT> doTruncate(Node<P_OUT> input) { |
|
return isOrdered |
|
? input.truncate(index, input.count(), generator) |
|
: input; |
|
} |
|
} |
|
} |