| 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 */  | 
 | 
package java.util.stream;  | 
 | 
 | 
 | 
import java.util.HashSet;  | 
 | 
import java.util.LinkedHashSet;  | 
 | 
import java.util.Objects;  | 
 | 
import java.util.Set;  | 
 | 
import java.util.Spliterator;  | 
 | 
import java.util.concurrent.ConcurrentHashMap;  | 
 | 
import java.util.concurrent.atomic.AtomicBoolean;  | 
 | 
import java.util.function.IntFunction;  | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 */  | 
 | 
final class DistinctOps { | 
 | 
 | 
 | 
    private DistinctOps() { } | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) { | 
 | 
        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,  | 
 | 
                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { | 
 | 
 | 
 | 
            <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { | 
 | 
                // If the stream is SORTED then it should also be ORDERED so the following will also  | 
 | 
                  | 
 | 
                TerminalOp<T, LinkedHashSet<T>> reduceOp  | 
 | 
                        = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,  | 
 | 
                                                                 LinkedHashSet::addAll);  | 
 | 
                return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));  | 
 | 
            }  | 
 | 
 | 
 | 
            @Override  | 
 | 
            <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,  | 
 | 
                                              Spliterator<P_IN> spliterator,  | 
 | 
                                              IntFunction<T[]> generator) { | 
 | 
                if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { | 
 | 
                      | 
 | 
                    return helper.evaluate(spliterator, false, generator);  | 
 | 
                }  | 
 | 
                else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { | 
 | 
                    return reduce(helper, spliterator);  | 
 | 
                }  | 
 | 
                else { | 
 | 
                      | 
 | 
                    AtomicBoolean seenNull = new AtomicBoolean(false);  | 
 | 
                    ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();  | 
 | 
                    TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> { | 
 | 
                        if (t == null)  | 
 | 
                            seenNull.set(true);  | 
 | 
                        else  | 
 | 
                            map.putIfAbsent(t, Boolean.TRUE);  | 
 | 
                    }, false);  | 
 | 
                    forEachOp.evaluateParallel(helper, spliterator);  | 
 | 
 | 
 | 
                    // If null has been seen then copy the key set into a HashSet that supports null values  | 
 | 
                      | 
 | 
                    Set<T> keys = map.keySet();  | 
 | 
                    if (seenNull.get()) { | 
 | 
                          | 
 | 
                        keys = new HashSet<>(keys);  | 
 | 
                        keys.add(null);  | 
 | 
                    }  | 
 | 
                    return Nodes.node(keys);  | 
 | 
                }  | 
 | 
            }  | 
 | 
 | 
 | 
            @Override  | 
 | 
            <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { | 
 | 
                if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { | 
 | 
                      | 
 | 
                    return helper.wrapSpliterator(spliterator);  | 
 | 
                }  | 
 | 
                else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { | 
 | 
                      | 
 | 
                    return reduce(helper, spliterator).spliterator();  | 
 | 
                }  | 
 | 
                else { | 
 | 
                      | 
 | 
                    return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));  | 
 | 
                }  | 
 | 
            }  | 
 | 
 | 
 | 
            @Override  | 
 | 
            Sink<T> opWrapSink(int flags, Sink<T> sink) { | 
 | 
                Objects.requireNonNull(sink);  | 
 | 
 | 
 | 
                if (StreamOpFlag.DISTINCT.isKnown(flags)) { | 
 | 
                    return sink;  | 
 | 
                } else if (StreamOpFlag.SORTED.isKnown(flags)) { | 
 | 
                    return new Sink.ChainedReference<T, T>(sink) { | 
 | 
                        boolean seenNull;  | 
 | 
                        T lastSeen;  | 
 | 
 | 
 | 
                        @Override  | 
 | 
                        public void begin(long size) { | 
 | 
                            seenNull = false;  | 
 | 
                            lastSeen = null;  | 
 | 
                            downstream.begin(-1);  | 
 | 
                        }  | 
 | 
 | 
 | 
                        @Override  | 
 | 
                        public void end() { | 
 | 
                            seenNull = false;  | 
 | 
                            lastSeen = null;  | 
 | 
                            downstream.end();  | 
 | 
                        }  | 
 | 
 | 
 | 
                        @Override  | 
 | 
                        public void accept(T t) { | 
 | 
                            if (t == null) { | 
 | 
                                if (!seenNull) { | 
 | 
                                    seenNull = true;  | 
 | 
                                    downstream.accept(lastSeen = null);  | 
 | 
                                }  | 
 | 
                            } else if (lastSeen == null || !t.equals(lastSeen)) { | 
 | 
                                downstream.accept(lastSeen = t);  | 
 | 
                            }  | 
 | 
                        }  | 
 | 
                    };  | 
 | 
                } else { | 
 | 
                    return new Sink.ChainedReference<T, T>(sink) { | 
 | 
                        Set<T> seen;  | 
 | 
 | 
 | 
                        @Override  | 
 | 
                        public void begin(long size) { | 
 | 
                            seen = new HashSet<>();  | 
 | 
                            downstream.begin(-1);  | 
 | 
                        }  | 
 | 
 | 
 | 
                        @Override  | 
 | 
                        public void end() { | 
 | 
                            seen = null;  | 
 | 
                            downstream.end();  | 
 | 
                        }  | 
 | 
 | 
 | 
                        @Override  | 
 | 
                        public void accept(T t) { | 
 | 
                            if (!seen.contains(t)) { | 
 | 
                                seen.add(t);  | 
 | 
                                downstream.accept(t);  | 
 | 
                            }  | 
 | 
                        }  | 
 | 
                    };  | 
 | 
                }  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
}  |