|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
package java.util.stream; |
|
|
|
import java.util.Objects; |
|
import java.util.Spliterator; |
|
import java.util.concurrent.ConcurrentHashMap; |
|
import java.util.concurrent.CountedCompleter; |
|
import java.util.function.Consumer; |
|
import java.util.function.DoubleConsumer; |
|
import java.util.function.IntConsumer; |
|
import java.util.function.IntFunction; |
|
import java.util.function.LongConsumer; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
final class ForEachOps { |
|
|
|
private ForEachOps() { } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, |
|
boolean ordered) { |
|
Objects.requireNonNull(action); |
|
return new ForEachOp.OfRef<>(action, ordered); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public static TerminalOp<Integer, Void> makeInt(IntConsumer action, |
|
boolean ordered) { |
|
Objects.requireNonNull(action); |
|
return new ForEachOp.OfInt(action, ordered); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public static TerminalOp<Long, Void> makeLong(LongConsumer action, |
|
boolean ordered) { |
|
Objects.requireNonNull(action); |
|
return new ForEachOp.OfLong(action, ordered); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action, |
|
boolean ordered) { |
|
Objects.requireNonNull(action); |
|
return new ForEachOp.OfDouble(action, ordered); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
abstract static class ForEachOp<T> |
|
implements TerminalOp<T, Void>, TerminalSink<T, Void> { |
|
private final boolean ordered; |
|
|
|
protected ForEachOp(boolean ordered) { |
|
this.ordered = ordered; |
|
} |
|
|
|
// TerminalOp |
|
|
|
@Override |
|
public int getOpFlags() { |
|
return ordered ? 0 : StreamOpFlag.NOT_ORDERED; |
|
} |
|
|
|
@Override |
|
public <S> Void evaluateSequential(PipelineHelper<T> helper, |
|
Spliterator<S> spliterator) { |
|
return helper.wrapAndCopyInto(this, spliterator).get(); |
|
} |
|
|
|
@Override |
|
public <S> Void evaluateParallel(PipelineHelper<T> helper, |
|
Spliterator<S> spliterator) { |
|
if (ordered) |
|
new ForEachOrderedTask<>(helper, spliterator, this).invoke(); |
|
else |
|
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); |
|
return null; |
|
} |
|
|
|
// TerminalSink |
|
|
|
@Override |
|
public Void get() { |
|
return null; |
|
} |
|
|
|
// Implementations |
|
|
|
|
|
static final class OfRef<T> extends ForEachOp<T> { |
|
final Consumer<? super T> consumer; |
|
|
|
OfRef(Consumer<? super T> consumer, boolean ordered) { |
|
super(ordered); |
|
this.consumer = consumer; |
|
} |
|
|
|
@Override |
|
public void accept(T t) { |
|
consumer.accept(t); |
|
} |
|
} |
|
|
|
|
|
static final class OfInt extends ForEachOp<Integer> |
|
implements Sink.OfInt { |
|
final IntConsumer consumer; |
|
|
|
OfInt(IntConsumer consumer, boolean ordered) { |
|
super(ordered); |
|
this.consumer = consumer; |
|
} |
|
|
|
@Override |
|
public StreamShape inputShape() { |
|
return StreamShape.INT_VALUE; |
|
} |
|
|
|
@Override |
|
public void accept(int t) { |
|
consumer.accept(t); |
|
} |
|
} |
|
|
|
|
|
static final class OfLong extends ForEachOp<Long> |
|
implements Sink.OfLong { |
|
final LongConsumer consumer; |
|
|
|
OfLong(LongConsumer consumer, boolean ordered) { |
|
super(ordered); |
|
this.consumer = consumer; |
|
} |
|
|
|
@Override |
|
public StreamShape inputShape() { |
|
return StreamShape.LONG_VALUE; |
|
} |
|
|
|
@Override |
|
public void accept(long t) { |
|
consumer.accept(t); |
|
} |
|
} |
|
|
|
|
|
static final class OfDouble extends ForEachOp<Double> |
|
implements Sink.OfDouble { |
|
final DoubleConsumer consumer; |
|
|
|
OfDouble(DoubleConsumer consumer, boolean ordered) { |
|
super(ordered); |
|
this.consumer = consumer; |
|
} |
|
|
|
@Override |
|
public StreamShape inputShape() { |
|
return StreamShape.DOUBLE_VALUE; |
|
} |
|
|
|
@Override |
|
public void accept(double t) { |
|
consumer.accept(t); |
|
} |
|
} |
|
} |
|
|
|
|
|
@SuppressWarnings("serial") |
|
static final class ForEachTask<S, T> extends CountedCompleter<Void> { |
|
private Spliterator<S> spliterator; |
|
private final Sink<S> sink; |
|
private final PipelineHelper<T> helper; |
|
private long targetSize; |
|
|
|
ForEachTask(PipelineHelper<T> helper, |
|
Spliterator<S> spliterator, |
|
Sink<S> sink) { |
|
super(null); |
|
this.sink = sink; |
|
this.helper = helper; |
|
this.spliterator = spliterator; |
|
this.targetSize = 0L; |
|
} |
|
|
|
ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { |
|
super(parent); |
|
this.spliterator = spliterator; |
|
this.sink = parent.sink; |
|
this.targetSize = parent.targetSize; |
|
this.helper = parent.helper; |
|
} |
|
|
|
|
|
public void compute() { |
|
Spliterator<S> rightSplit = spliterator, leftSplit; |
|
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; |
|
if ((sizeThreshold = targetSize) == 0L) |
|
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); |
|
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); |
|
boolean forkRight = false; |
|
Sink<S> taskSink = sink; |
|
ForEachTask<S, T> task = this; |
|
while (!isShortCircuit || !taskSink.cancellationRequested()) { |
|
if (sizeEstimate <= sizeThreshold || |
|
(leftSplit = rightSplit.trySplit()) == null) { |
|
task.helper.copyInto(taskSink, rightSplit); |
|
break; |
|
} |
|
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit); |
|
task.addToPendingCount(1); |
|
ForEachTask<S, T> taskToFork; |
|
if (forkRight) { |
|
forkRight = false; |
|
rightSplit = leftSplit; |
|
taskToFork = task; |
|
task = leftTask; |
|
} |
|
else { |
|
forkRight = true; |
|
taskToFork = leftTask; |
|
} |
|
taskToFork.fork(); |
|
sizeEstimate = rightSplit.estimateSize(); |
|
} |
|
task.spliterator = null; |
|
task.propagateCompletion(); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("serial") |
|
static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { |
|
/* |
|
* Our goal is to ensure that the elements associated with a task are |
|
* processed according to an in-order traversal of the computation tree. |
|
* We use completion counts for representing these dependencies, so that |
|
* a task does not complete until all the tasks preceding it in this |
|
* order complete. We use the "completion map" to associate the next |
|
* task in this order for any left child. We increase the pending count |
|
* of any node on the right side of such a mapping by one to indicate |
|
* its dependency, and when a node on the left side of such a mapping |
|
* completes, it decrements the pending count of its corresponding right |
|
* side. As the computation tree is expanded by splitting, we must |
|
* atomically update the mappings to maintain the invariant that the |
|
* completion map maps left children to the next node in the in-order |
|
* traversal. |
|
* |
|
* Take, for example, the following computation tree of tasks: |
|
* |
|
* a |
|
* / \ |
|
* b c |
|
* / \ / \ |
|
* d e f g |
|
* |
|
* The complete map will contain (not necessarily all at the same time) |
|
* the following associations: |
|
* |
|
* d -> e |
|
* b -> f |
|
* f -> g |
|
* |
|
* Tasks e, f, g will have their pending counts increased by 1. |
|
* |
|
* The following relationships hold: |
|
* |
|
* - completion of d "happens-before" e; |
|
* - completion of d and e "happens-before b; |
|
* - completion of b "happens-before" f; and |
|
* - completion of f "happens-before" g |
|
* |
|
* Thus overall the "happens-before" relationship holds for the |
|
* reporting of elements, covered by tasks d, e, f and g, as specified |
|
* by the forEachOrdered operation. |
|
*/ |
|
|
|
private final PipelineHelper<T> helper; |
|
private Spliterator<S> spliterator; |
|
private final long targetSize; |
|
private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap; |
|
private final Sink<T> action; |
|
private final ForEachOrderedTask<S, T> leftPredecessor; |
|
private Node<T> node; |
|
|
|
protected ForEachOrderedTask(PipelineHelper<T> helper, |
|
Spliterator<S> spliterator, |
|
Sink<T> action) { |
|
super(null); |
|
this.helper = helper; |
|
this.spliterator = spliterator; |
|
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); |
|
|
|
this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.getLeafTarget() << 1)); |
|
this.action = action; |
|
this.leftPredecessor = null; |
|
} |
|
|
|
ForEachOrderedTask(ForEachOrderedTask<S, T> parent, |
|
Spliterator<S> spliterator, |
|
ForEachOrderedTask<S, T> leftPredecessor) { |
|
super(parent); |
|
this.helper = parent.helper; |
|
this.spliterator = spliterator; |
|
this.targetSize = parent.targetSize; |
|
this.completionMap = parent.completionMap; |
|
this.action = parent.action; |
|
this.leftPredecessor = leftPredecessor; |
|
} |
|
|
|
@Override |
|
public final void compute() { |
|
doCompute(this); |
|
} |
|
|
|
private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) { |
|
Spliterator<S> rightSplit = task.spliterator, leftSplit; |
|
long sizeThreshold = task.targetSize; |
|
boolean forkRight = false; |
|
while (rightSplit.estimateSize() > sizeThreshold && |
|
(leftSplit = rightSplit.trySplit()) != null) { |
|
ForEachOrderedTask<S, T> leftChild = |
|
new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); |
|
ForEachOrderedTask<S, T> rightChild = |
|
new ForEachOrderedTask<>(task, rightSplit, leftChild); |
|
|
|
// Fork the parent task |
|
// Completion of the left and right children "happens-before" |
|
|
|
task.addToPendingCount(1); |
|
// Completion of the left child "happens-before" completion of |
|
|
|
rightChild.addToPendingCount(1); |
|
task.completionMap.put(leftChild, rightChild); |
|
|
|
|
|
if (task.leftPredecessor != null) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
leftChild.addToPendingCount(1); |
|
// Update association of left-predecessor to left-most |
|
|
|
if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) { |
|
// If replaced, adjust the pending count of the parent |
|
|
|
task.addToPendingCount(-1); |
|
} else { |
|
// Left-predecessor has already completed, parent's |
|
// pending count is adjusted by left-predecessor; |
|
|
|
leftChild.addToPendingCount(-1); |
|
} |
|
} |
|
|
|
ForEachOrderedTask<S, T> taskToFork; |
|
if (forkRight) { |
|
forkRight = false; |
|
rightSplit = leftSplit; |
|
task = leftChild; |
|
taskToFork = rightChild; |
|
} |
|
else { |
|
forkRight = true; |
|
task = rightChild; |
|
taskToFork = leftChild; |
|
} |
|
taskToFork.fork(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
if (task.getPendingCount() > 0) { |
|
// Cannot complete just yet so buffer elements into a Node |
|
|
|
@SuppressWarnings("unchecked") |
|
IntFunction<T[]> generator = size -> (T[]) new Object[size]; |
|
Node.Builder<T> nb = task.helper.makeNodeBuilder( |
|
task.helper.exactOutputSizeIfKnown(rightSplit), |
|
generator); |
|
task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); |
|
task.spliterator = null; |
|
} |
|
task.tryComplete(); |
|
} |
|
|
|
@Override |
|
public void onCompletion(CountedCompleter<?> caller) { |
|
if (node != null) { |
|
|
|
node.forEach(action); |
|
node = null; |
|
} |
|
else if (spliterator != null) { |
|
|
|
helper.wrapAndCopyInto(action, spliterator); |
|
spliterator = null; |
|
} |
|
|
|
// The completion of this task *and* the dumping of elements |
|
// "happens-before" completion of the associated left-most leaf task |
|
// of right subtree (if any, which can be this task's right sibling) |
|
|
|
ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this); |
|
if (leftDescendant != null) |
|
leftDescendant.tryComplete(); |
|
} |
|
} |
|
} |