| 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 */  | 
 | 
package java.util.stream;  | 
 | 
 | 
 | 
import java.util.LongSummaryStatistics;  | 
 | 
import java.util.Objects;  | 
 | 
import java.util.OptionalDouble;  | 
 | 
import java.util.OptionalLong;  | 
 | 
import java.util.PrimitiveIterator;  | 
 | 
import java.util.Spliterator;  | 
 | 
import java.util.Spliterators;  | 
 | 
import java.util.function.BiConsumer;  | 
 | 
import java.util.function.BinaryOperator;  | 
 | 
import java.util.function.IntFunction;  | 
 | 
import java.util.function.LongBinaryOperator;  | 
 | 
import java.util.function.LongConsumer;  | 
 | 
import java.util.function.LongFunction;  | 
 | 
import java.util.function.LongPredicate;  | 
 | 
import java.util.function.LongToDoubleFunction;  | 
 | 
import java.util.function.LongToIntFunction;  | 
 | 
import java.util.function.LongUnaryOperator;  | 
 | 
import java.util.function.ObjLongConsumer;  | 
 | 
import java.util.function.Supplier;  | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 */  | 
 | 
abstract class LongPipeline<E_IN>  | 
 | 
        extends AbstractPipeline<E_IN, Long, LongStream>  | 
 | 
        implements LongStream { | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    LongPipeline(Supplier<? extends Spliterator<Long>> source,  | 
 | 
                 int sourceFlags, boolean parallel) { | 
 | 
        super(source, sourceFlags, parallel);  | 
 | 
    }  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    LongPipeline(Spliterator<Long> source,  | 
 | 
                 int sourceFlags, boolean parallel) { | 
 | 
        super(source, sourceFlags, parallel);  | 
 | 
    }  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { | 
 | 
        super(upstream, opFlags);  | 
 | 
    }  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    private static LongConsumer adapt(Sink<Long> sink) { | 
 | 
        if (sink instanceof LongConsumer) { | 
 | 
            return (LongConsumer) sink;  | 
 | 
        } else { | 
 | 
            if (Tripwire.ENABLED)  | 
 | 
                Tripwire.trip(AbstractPipeline.class,  | 
 | 
                              "using LongStream.adapt(Sink<Long> s)");  | 
 | 
            return sink::accept;  | 
 | 
        }  | 
 | 
    }  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    private static Spliterator.OfLong adapt(Spliterator<Long> s) { | 
 | 
        if (s instanceof Spliterator.OfLong) { | 
 | 
            return (Spliterator.OfLong) s;  | 
 | 
        } else { | 
 | 
            if (Tripwire.ENABLED)  | 
 | 
                Tripwire.trip(AbstractPipeline.class,  | 
 | 
                              "using LongStream.adapt(Spliterator<Long> s)");  | 
 | 
            throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)"); | 
 | 
        }  | 
 | 
    }  | 
 | 
 | 
 | 
 | 
 | 
    // Shape-specific methods  | 
 | 
 | 
 | 
    @Override  | 
 | 
    final StreamShape getOutputShape() { | 
 | 
        return StreamShape.LONG_VALUE;  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,  | 
 | 
                                           Spliterator<P_IN> spliterator,  | 
 | 
                                           boolean flattenTree,  | 
 | 
                                           IntFunction<Long[]> generator) { | 
 | 
        return Nodes.collectLong(helper, spliterator, flattenTree);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,  | 
 | 
                                        Supplier<Spliterator<P_IN>> supplier,  | 
 | 
                                        boolean isParallel) { | 
 | 
        return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    @SuppressWarnings("unchecked") | 
 | 
    final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) { | 
 | 
        return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) { | 
 | 
        Spliterator.OfLong spl = adapt(spliterator);  | 
 | 
        LongConsumer adaptedSink =  adapt(sink);  | 
 | 
        do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) { | 
 | 
        return Nodes.longBuilder(exactSizeIfKnown);  | 
 | 
    }  | 
 | 
 | 
 | 
 | 
 | 
    // LongStream  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final PrimitiveIterator.OfLong iterator() { | 
 | 
        return Spliterators.iterator(spliterator());  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final Spliterator.OfLong spliterator() { | 
 | 
        return adapt(super.spliterator());  | 
 | 
    }  | 
 | 
 | 
 | 
    // Stateless intermediate ops from LongStream  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final DoubleStream asDoubleStream() { | 
 | 
        return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Double> sink) { | 
 | 
                return new Sink.ChainedLong<Double>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        downstream.accept((double) t);  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final Stream<Long> boxed() { | 
 | 
        return mapToObj(Long::valueOf);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream map(LongUnaryOperator mapper) { | 
 | 
        Objects.requireNonNull(mapper);  | 
 | 
        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Long> sink) { | 
 | 
                return new Sink.ChainedLong<Long>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        downstream.accept(mapper.applyAsLong(t));  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) { | 
 | 
        Objects.requireNonNull(mapper);  | 
 | 
        return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,  | 
 | 
                                                          StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<U> sink) { | 
 | 
                return new Sink.ChainedLong<U>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        downstream.accept(mapper.apply(t));  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final IntStream mapToInt(LongToIntFunction mapper) { | 
 | 
        Objects.requireNonNull(mapper);  | 
 | 
        return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Integer> sink) { | 
 | 
                return new Sink.ChainedLong<Integer>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        downstream.accept(mapper.applyAsInt(t));  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final DoubleStream mapToDouble(LongToDoubleFunction mapper) { | 
 | 
        Objects.requireNonNull(mapper);  | 
 | 
        return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Double> sink) { | 
 | 
                return new Sink.ChainedLong<Double>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        downstream.accept(mapper.applyAsDouble(t));  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream flatMap(LongFunction<? extends LongStream> mapper) { | 
 | 
        Objects.requireNonNull(mapper);  | 
 | 
        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Long> sink) { | 
 | 
                return new Sink.ChainedLong<Long>(sink) { | 
 | 
                      | 
 | 
                    boolean cancellationRequestedCalled;  | 
 | 
 | 
 | 
                      | 
 | 
                    LongConsumer downstreamAsLong = downstream::accept;  | 
 | 
 | 
 | 
                    @Override  | 
 | 
                    public void begin(long size) { | 
 | 
                        downstream.begin(-1);  | 
 | 
                    }  | 
 | 
 | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        try (LongStream result = mapper.apply(t)) { | 
 | 
                            if (result != null) { | 
 | 
                                if (!cancellationRequestedCalled) { | 
 | 
                                    result.sequential().forEach(downstreamAsLong);  | 
 | 
                                }  | 
 | 
                                else { | 
 | 
                                    Spliterator.OfLong s = result.sequential().spliterator();  | 
 | 
                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); | 
 | 
                                }  | 
 | 
                            }  | 
 | 
                        }  | 
 | 
                    }  | 
 | 
 | 
 | 
                    @Override  | 
 | 
                    public boolean cancellationRequested() { | 
 | 
                        // If this method is called then an operation within the stream  | 
 | 
                        // pipeline is short-circuiting (see AbstractPipeline.copyInto).  | 
 | 
                        // Note that we cannot differentiate between an upstream or  | 
 | 
                          | 
 | 
                        cancellationRequestedCalled = true;  | 
 | 
                        return downstream.cancellationRequested();  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public LongStream unordered() { | 
 | 
        if (!isOrdered())  | 
 | 
            return this;  | 
 | 
        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Long> sink) { | 
 | 
                return sink;  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream filter(LongPredicate predicate) { | 
 | 
        Objects.requireNonNull(predicate);  | 
 | 
        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                     StreamOpFlag.NOT_SIZED) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Long> sink) { | 
 | 
                return new Sink.ChainedLong<Long>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void begin(long size) { | 
 | 
                        downstream.begin(-1);  | 
 | 
                    }  | 
 | 
 | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        if (predicate.test(t))  | 
 | 
                            downstream.accept(t);  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream peek(LongConsumer action) { | 
 | 
        Objects.requireNonNull(action);  | 
 | 
        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,  | 
 | 
                                     0) { | 
 | 
            @Override  | 
 | 
            Sink<Long> opWrapSink(int flags, Sink<Long> sink) { | 
 | 
                return new Sink.ChainedLong<Long>(sink) { | 
 | 
                    @Override  | 
 | 
                    public void accept(long t) { | 
 | 
                        action.accept(t);  | 
 | 
                        downstream.accept(t);  | 
 | 
                    }  | 
 | 
                };  | 
 | 
            }  | 
 | 
        };  | 
 | 
    }  | 
 | 
 | 
 | 
    // Stateful intermediate ops from LongStream  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream limit(long maxSize) { | 
 | 
        if (maxSize < 0)  | 
 | 
            throw new IllegalArgumentException(Long.toString(maxSize));  | 
 | 
        return SliceOps.makeLong(this, 0, maxSize);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream skip(long n) { | 
 | 
        if (n < 0)  | 
 | 
            throw new IllegalArgumentException(Long.toString(n));  | 
 | 
        if (n == 0)  | 
 | 
            return this;  | 
 | 
        else  | 
 | 
            return SliceOps.makeLong(this, n, -1);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream sorted() { | 
 | 
        return SortedOps.makeLong(this);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongStream distinct() { | 
 | 
        // While functional and quick to implement, this approach is not very efficient.  | 
 | 
          | 
 | 
        return boxed().distinct().mapToLong(i -> (long) i);  | 
 | 
    }  | 
 | 
 | 
 | 
    // Terminal ops from LongStream  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public void forEach(LongConsumer action) { | 
 | 
        evaluate(ForEachOps.makeLong(action, false));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public void forEachOrdered(LongConsumer action) { | 
 | 
        evaluate(ForEachOps.makeLong(action, true));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final long sum() { | 
 | 
          | 
 | 
        return reduce(0, Long::sum);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final OptionalLong min() { | 
 | 
        return reduce(Math::min);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final OptionalLong max() { | 
 | 
        return reduce(Math::max);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final OptionalDouble average() { | 
 | 
        long[] avg = collect(() -> new long[2],  | 
 | 
                             (ll, i) -> { | 
 | 
                                 ll[0]++;  | 
 | 
                                 ll[1] += i;  | 
 | 
                             },  | 
 | 
                             (ll, rr) -> { | 
 | 
                                 ll[0] += rr[0];  | 
 | 
                                 ll[1] += rr[1];  | 
 | 
                             });  | 
 | 
        return avg[0] > 0  | 
 | 
               ? OptionalDouble.of((double) avg[1] / avg[0])  | 
 | 
               : OptionalDouble.empty();  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final long count() { | 
 | 
        return map(e -> 1L).sum();  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final LongSummaryStatistics summaryStatistics() { | 
 | 
        return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,  | 
 | 
                       LongSummaryStatistics::combine);  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final long reduce(long identity, LongBinaryOperator op) { | 
 | 
        return evaluate(ReduceOps.makeLong(identity, op));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final OptionalLong reduce(LongBinaryOperator op) { | 
 | 
        return evaluate(ReduceOps.makeLong(op));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final <R> R collect(Supplier<R> supplier,  | 
 | 
                               ObjLongConsumer<R> accumulator,  | 
 | 
                               BiConsumer<R, R> combiner) { | 
 | 
        Objects.requireNonNull(combiner);  | 
 | 
        BinaryOperator<R> operator = (left, right) -> { | 
 | 
            combiner.accept(left, right);  | 
 | 
            return left;  | 
 | 
        };  | 
 | 
        return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final boolean anyMatch(LongPredicate predicate) { | 
 | 
        return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final boolean allMatch(LongPredicate predicate) { | 
 | 
        return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final boolean noneMatch(LongPredicate predicate) { | 
 | 
        return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final OptionalLong findFirst() { | 
 | 
        return evaluate(FindOps.makeLong(true));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final OptionalLong findAny() { | 
 | 
        return evaluate(FindOps.makeLong(false));  | 
 | 
    }  | 
 | 
 | 
 | 
    @Override  | 
 | 
    public final long[] toArray() { | 
 | 
        return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))  | 
 | 
                .asPrimitiveArray();  | 
 | 
    }  | 
 | 
 | 
 | 
 | 
 | 
    //  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    static class Head<E_IN> extends LongPipeline<E_IN> { | 
 | 
          | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
         */  | 
 | 
        Head(Supplier<? extends Spliterator<Long>> source,  | 
 | 
             int sourceFlags, boolean parallel) { | 
 | 
            super(source, sourceFlags, parallel);  | 
 | 
        }  | 
 | 
 | 
 | 
          | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
         */  | 
 | 
        Head(Spliterator<Long> source,  | 
 | 
             int sourceFlags, boolean parallel) { | 
 | 
            super(source, sourceFlags, parallel);  | 
 | 
        }  | 
 | 
 | 
 | 
        @Override  | 
 | 
        final boolean opIsStateful() { | 
 | 
            throw new UnsupportedOperationException();  | 
 | 
        }  | 
 | 
 | 
 | 
        @Override  | 
 | 
        final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) { | 
 | 
            throw new UnsupportedOperationException();  | 
 | 
        }  | 
 | 
 | 
 | 
        // Optimized sequential terminal operations for the head of the pipeline  | 
 | 
 | 
 | 
        @Override  | 
 | 
        public void forEach(LongConsumer action) { | 
 | 
            if (!isParallel()) { | 
 | 
                adapt(sourceStageSpliterator()).forEachRemaining(action);  | 
 | 
            } else { | 
 | 
                super.forEach(action);  | 
 | 
            }  | 
 | 
        }  | 
 | 
 | 
 | 
        @Override  | 
 | 
        public void forEachOrdered(LongConsumer action) { | 
 | 
            if (!isParallel()) { | 
 | 
                adapt(sourceStageSpliterator()).forEachRemaining(action);  | 
 | 
            } else { | 
 | 
                super.forEachOrdered(action);  | 
 | 
            }  | 
 | 
        }  | 
 | 
    }  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> { | 
 | 
          | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
         */  | 
 | 
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,  | 
 | 
                    StreamShape inputShape,  | 
 | 
                    int opFlags) { | 
 | 
            super(upstream, opFlags);  | 
 | 
            assert upstream.getOutputShape() == inputShape;  | 
 | 
        }  | 
 | 
 | 
 | 
        @Override  | 
 | 
        final boolean opIsStateful() { | 
 | 
            return false;  | 
 | 
        }  | 
 | 
    }  | 
 | 
 | 
 | 
      | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
     */  | 
 | 
    abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> { | 
 | 
          | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
 | 
         */  | 
 | 
        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,  | 
 | 
                   StreamShape inputShape,  | 
 | 
                   int opFlags) { | 
 | 
            super(upstream, opFlags);  | 
 | 
            assert upstream.getOutputShape() == inputShape;  | 
 | 
        }  | 
 | 
 | 
 | 
        @Override  | 
 | 
        final boolean opIsStateful() { | 
 | 
            return true;  | 
 | 
        }  | 
 | 
 | 
 | 
        @Override  | 
 | 
        abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,  | 
 | 
                                                      Spliterator<P_IN> spliterator,  | 
 | 
                                                      IntFunction<Long[]> generator);  | 
 | 
    }  | 
 | 
}  |