|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
package java.util.stream; |
|
|
|
import java.util.HashSet; |
|
import java.util.LinkedHashSet; |
|
import java.util.Objects; |
|
import java.util.Set; |
|
import java.util.Spliterator; |
|
import java.util.concurrent.ConcurrentHashMap; |
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
import java.util.function.IntFunction; |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
final class DistinctOps { |
|
|
|
private DistinctOps() { } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) { |
|
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, |
|
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { |
|
|
|
<P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { |
|
// If the stream is SORTED then it should also be ORDERED so the following will also |
|
|
|
TerminalOp<T, LinkedHashSet<T>> reduceOp |
|
= ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add, |
|
LinkedHashSet::addAll); |
|
return Nodes.node(reduceOp.evaluateParallel(helper, spliterator)); |
|
} |
|
|
|
@Override |
|
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, |
|
Spliterator<P_IN> spliterator, |
|
IntFunction<T[]> generator) { |
|
if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { |
|
|
|
return helper.evaluate(spliterator, false, generator); |
|
} |
|
else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
return reduce(helper, spliterator); |
|
} |
|
else { |
|
|
|
AtomicBoolean seenNull = new AtomicBoolean(false); |
|
ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>(); |
|
TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> { |
|
if (t == null) |
|
seenNull.set(true); |
|
else |
|
map.putIfAbsent(t, Boolean.TRUE); |
|
}, false); |
|
forEachOp.evaluateParallel(helper, spliterator); |
|
|
|
// If null has been seen then copy the key set into a HashSet that supports null values |
|
|
|
Set<T> keys = map.keySet(); |
|
if (seenNull.get()) { |
|
|
|
keys = new HashSet<>(keys); |
|
keys.add(null); |
|
} |
|
return Nodes.node(keys); |
|
} |
|
} |
|
|
|
@Override |
|
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { |
|
if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { |
|
|
|
return helper.wrapSpliterator(spliterator); |
|
} |
|
else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { |
|
|
|
return reduce(helper, spliterator).spliterator(); |
|
} |
|
else { |
|
|
|
return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator)); |
|
} |
|
} |
|
|
|
@Override |
|
Sink<T> opWrapSink(int flags, Sink<T> sink) { |
|
Objects.requireNonNull(sink); |
|
|
|
if (StreamOpFlag.DISTINCT.isKnown(flags)) { |
|
return sink; |
|
} else if (StreamOpFlag.SORTED.isKnown(flags)) { |
|
return new Sink.ChainedReference<T, T>(sink) { |
|
boolean seenNull; |
|
T lastSeen; |
|
|
|
@Override |
|
public void begin(long size) { |
|
seenNull = false; |
|
lastSeen = null; |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void end() { |
|
seenNull = false; |
|
lastSeen = null; |
|
downstream.end(); |
|
} |
|
|
|
@Override |
|
public void accept(T t) { |
|
if (t == null) { |
|
if (!seenNull) { |
|
seenNull = true; |
|
downstream.accept(lastSeen = null); |
|
} |
|
} else if (lastSeen == null || !t.equals(lastSeen)) { |
|
downstream.accept(lastSeen = t); |
|
} |
|
} |
|
}; |
|
} else { |
|
return new Sink.ChainedReference<T, T>(sink) { |
|
Set<T> seen; |
|
|
|
@Override |
|
public void begin(long size) { |
|
seen = new HashSet<>(); |
|
downstream.begin(-1); |
|
} |
|
|
|
@Override |
|
public void end() { |
|
seen = null; |
|
downstream.end(); |
|
} |
|
|
|
@Override |
|
public void accept(T t) { |
|
if (!seen.contains(t)) { |
|
seen.add(t); |
|
downstream.accept(t); |
|
} |
|
} |
|
}; |
|
} |
|
} |
|
}; |
|
} |
|
} |