/* |
|
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
* |
|
* This code is free software; you can redistribute it and/or modify it |
|
* under the terms of the GNU General Public License version 2 only, as |
|
* published by the Free Software Foundation. Oracle designates this |
|
* particular file as subject to the "Classpath" exception as provided |
|
* by Oracle in the LICENSE file that accompanied this code. |
|
* |
|
* This code is distributed in the hope that it will be useful, but WITHOUT |
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
* version 2 for more details (a copy is included in the LICENSE file that |
|
* accompanied this code). |
|
* |
|
* You should have received a copy of the GNU General Public License version |
|
* 2 along with this work; if not, write to the Free Software Foundation, |
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
* |
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
* or visit www.oracle.com if you need additional information or have any |
|
* questions. |
|
*/ |
|
package java.util.stream; |
|
import java.util.Spliterator; |
|
import java.util.concurrent.atomic.AtomicReference; |
|
/** |
|
* Abstract class for fork-join tasks used to implement short-circuiting |
|
* stream ops, which can produce a result without processing all elements of the |
|
* stream. |
|
* |
|
* @param <P_IN> type of input elements to the pipeline |
|
* @param <P_OUT> type of output elements from the pipeline |
|
* @param <R> type of intermediate result, may be different from operation |
|
* result type |
|
* @param <K> type of child and sibling tasks |
|
* @since 1.8 |
|
*/ |
|
@SuppressWarnings("serial") |
|
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, |
|
K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>> |
|
extends AbstractTask<P_IN, P_OUT, R, K> { |
|
/** |
|
* The result for this computation; this is shared among all tasks and set |
|
* exactly once |
|
*/ |
|
protected final AtomicReference<R> sharedResult; |
|
/** |
|
* Indicates whether this task has been canceled. Tasks may cancel other |
|
* tasks in the computation under various conditions, such as in a |
|
* find-first operation, a task that finds a value will cancel all tasks |
|
* that are later in the encounter order. |
|
*/ |
|
protected volatile boolean canceled; |
|
/** |
|
* Constructor for root tasks. |
|
* |
|
* @param helper the {@code PipelineHelper} describing the stream pipeline |
|
* up to this operation |
|
* @param spliterator the {@code Spliterator} describing the source for this |
|
* pipeline |
|
*/ |
|
protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper, |
|
Spliterator<P_IN> spliterator) { |
|
super(helper, spliterator); |
|
sharedResult = new AtomicReference<>(null); |
|
} |
|
/** |
|
* Constructor for non-root nodes. |
|
* |
|
* @param parent parent task in the computation tree |
|
* @param spliterator the {@code Spliterator} for the portion of the |
|
* computation tree described by this task |
|
*/ |
|
protected AbstractShortCircuitTask(K parent, |
|
Spliterator<P_IN> spliterator) { |
|
super(parent, spliterator); |
|
sharedResult = parent.sharedResult; |
|
} |
|
/** |
|
* Returns the value indicating the computation completed with no task |
|
* finding a short-circuitable result. For example, for a "find" operation, |
|
* this might be null or an empty {@code Optional}. |
|
* |
|
* @return the result to return when no task finds a result |
|
*/ |
|
protected abstract R getEmptyResult(); |
|
/** |
|
* Overrides AbstractTask version to include checks for early |
|
* exits while splitting or computing. |
|
*/ |
|
@Override |
|
public void compute() { |
|
Spliterator<P_IN> rs = spliterator, ls; |
|
long sizeEstimate = rs.estimateSize(); |
|
long sizeThreshold = getTargetSize(sizeEstimate); |
|
boolean forkRight = false; |
|
@SuppressWarnings("unchecked") K task = (K) this; |
|
AtomicReference<R> sr = sharedResult; |
|
R result; |
|
while ((result = sr.get()) == null) { |
|
if (task.taskCanceled()) { |
|
result = task.getEmptyResult(); |
|
break; |
|
} |
|
if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) { |
|
result = task.doLeaf(); |
|
break; |
|
} |
|
K leftChild, rightChild, taskToFork; |
|
task.leftChild = leftChild = task.makeChild(ls); |
|
task.rightChild = rightChild = task.makeChild(rs); |
|
task.setPendingCount(1); |
|
if (forkRight) { |
|
forkRight = false; |
|
rs = ls; |
|
task = leftChild; |
|
taskToFork = rightChild; |
|
} |
|
else { |
|
forkRight = true; |
|
task = rightChild; |
|
taskToFork = leftChild; |
|
} |
|
taskToFork.fork(); |
|
sizeEstimate = rs.estimateSize(); |
|
} |
|
task.setLocalResult(result); |
|
task.tryComplete(); |
|
} |
|
/** |
|
* Declares that a globally valid result has been found. If another task has |
|
* not already found the answer, the result is installed in |
|
* {@code sharedResult}. The {@code compute()} method will check |
|
* {@code sharedResult} before proceeding with computation, so this causes |
|
* the computation to terminate early. |
|
* |
|
* @param result the result found |
|
*/ |
|
protected void shortCircuit(R result) { |
|
if (result != null) |
|
sharedResult.compareAndSet(null, result); |
|
} |
|
/** |
|
* Sets a local result for this task. If this task is the root, set the |
|
* shared result instead (if not already set). |
|
* |
|
* @param localResult The result to set for this task |
|
*/ |
|
@Override |
|
protected void setLocalResult(R localResult) { |
|
if (isRoot()) { |
|
if (localResult != null) |
|
sharedResult.compareAndSet(null, localResult); |
|
} |
|
else |
|
super.setLocalResult(localResult); |
|
} |
|
/** |
|
* Retrieves the local result for this task |
|
*/ |
|
@Override |
|
public R getRawResult() { |
|
return getLocalResult(); |
|
} |
|
/** |
|
* Retrieves the local result for this task. If this task is the root, |
|
* retrieves the shared result instead. |
|
*/ |
|
@Override |
|
public R getLocalResult() { |
|
if (isRoot()) { |
|
R answer = sharedResult.get(); |
|
return (answer == null) ? getEmptyResult() : answer; |
|
} |
|
else |
|
return super.getLocalResult(); |
|
} |
|
/** |
|
* Mark this task as canceled |
|
*/ |
|
protected void cancel() { |
|
canceled = true; |
|
} |
|
/** |
|
* Queries whether this task is canceled. A task is considered canceled if |
|
* it or any of its parents have been canceled. |
|
* |
|
* @return {@code true} if this task or any parent is canceled. |
|
*/ |
|
protected boolean taskCanceled() { |
|
boolean cancel = canceled; |
|
if (!cancel) { |
|
for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent()) |
|
cancel = parent.canceled; |
|
} |
|
return cancel; |
|
} |
|
/** |
|
* Cancels all tasks which succeed this one in the encounter order. This |
|
* includes canceling all the current task's right sibling, as well as the |
|
* later right siblings of all its parents. |
|
*/ |
|
protected void cancelLaterNodes() { |
|
// Go up the tree, cancel right siblings of this node and all parents |
|
for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this; |
|
parent != null; |
|
node = parent, parent = parent.getParent()) { |
|
// If node is a left child of parent, then has a right sibling |
|
if (parent.leftChild == node) { |
|
K rightSibling = parent.rightChild; |
|
if (!rightSibling.canceled) |
|
rightSibling.cancel(); |
|
} |
|
} |
|
} |
|
} |