Bootstrap

Java Stream 源码深入解析

类图

概念解释

Pipline和Stage

Pipline是流水线,表示一整个流程。Stage表示流水线的其中一个阶段。是一个比较抽象层面的描述,因为stage主要表示一种逻辑上的顺序关系,而具体每一个阶段要干嘛、怎么干,使用Sink来进行描述。

new stream          //stage 0
    .filter()       //stage 1
    .sort()         //stage 2

Sink

直译为水槽,生活中水槽的作用无非

  • 打开水龙头,知道有水要来

  • 水在水槽里, 可以进行一些操作

  • 打开水闸,放水Java中的Sink核心功能为:

  • begin(): 告诉该水槽水流要来了,可以进行一些初始化操作

  • accept():接受水流,然后进行操作

  • end():水流全部处理完了。看一个sort()的示例,sort这个stage的目的就是对所有水流进行排序,然后再流到下游。

private static final class SizedRefSortingSink extends AbstractRefSortingSink {
        private T[] array;  //要进行排序,需要一个数组进行缓存
        private int offset; 

        SizedRefSortingSink(Sink sink, Comparator comparator) {
            super(sink, comparator);
        }

        @Override
        @SuppressWarnings("unchecked")
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            //上游调用begin(),通知sort进行初始化操作,生产一个数组
            array = (T[]) new Object[(int) size];
        }

      //上游调用end()方法,告诉sort水已经全部流过来了。sort开始执行操作
        @Override
        public void end() {
            //操作
            Arrays.sort(array, 0, offset, comparator);
            //告诉sort的下游准备接受水流
            downstream.begin(offset);
            //一个个元素的传递给下游
            if (!cancellationWasRequested) {
                for (int i = 0; i < offset; i++)
                    downstream.accept(array[i]);
            }
            else {
                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
                    downstream.accept(array[i]);
            }
            //告诉下游水流传递结束
            downstream.end();
            //缓存清空
            array = null;
        }

      //上游调用accept()方法,将水流存储到到sort的缓存数组中
        @Override
        public void accept(T t) {
            array[offset++] = t;
        }
    }

创建Head

疑问

  • 官方说Stream不存储数据,那么数据保存在那里呢?解答在后面。

使用方式

可以使用Stream.of()创建一个流,例如

//创建方式 of()
Stream stream = Stream.of(1, 2, 3);

源码分析

of()方法调用

StreamSupport.stream(Arrays.spliterator(arr, 0, arr.length), false);

stream()方法逻辑:

public static  Stream stream(Spliterator spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

调用了,返回一个Head对象。Head是ReferencePipeline的子类。可以理解为Head是流水线的第一个stage。构造方法的主要了逻辑要一直super()到类

    /**
     * The source spliterator. Only valid for the head pipeline.
     * Before the pipeline is consumed if non-null then {@code sourceSupplier}
     * must be null. After the pipeline is consumed if non-null then is set to
     * null.
     */
    private Spliterator sourceSpliterator;

    /**
     * Constructor for the head of a stream pipeline.
     *
     * @param source {@code Spliterator} describing the stream source
     * @param sourceFlags the source flags for the stream source, described in
     * {@link StreamOpFlag}
     * @param parallel {@code true} if the pipeline is parallel
     */
    AbstractPipeline(Spliterator source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        //使用一个字段指向数据集合的Spliterator,后续终结操作的时候,引用的方式操作数据
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }

疑问解答

中间操作

几个疑问

使用方式

Stream st = headStream.filter(e-> e=1).distinct().sort();
//等同于
Stream afterFilter = headStream.filter(e -> e != 2);
Stream afterDistinct = afterFilter.distinct();
Stream afterSort = afterDistinct.sort();

Filter

执行filter(op)会发生什么?

Stream afterFilter = head.filter(e -> e = 1);

filter()方法定义在类,实现在类。

//ReferencePipeline.class

@Override
public final Stream filter(Predicate predicate) {
    Objects.requireNonNull(predicate);

    // 返回一个StatelessOp类
    // 构造函数参数为(this,)
    return new StatelessOp(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink opWrapSink(int flags, Sink sink) {
            return new Sink.ChainedReference(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

返回一个类(因为filter是一个无状态操作),看下类,是一个静态抽象内部类,继承了类。

//ReferencePipeline.class

    /**
     * Base class for a stateless intermediate stage of a Stream.
     *
     * @param  type of elements in the upstream source
     * @param  type of elements in produced by this stage
     * @since 1.8
     */
    abstract static class StatelessOp
            extends ReferencePipeline {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

中间super()会执行类的构造方法, 连接stage之间的关系

//AbstractPipeline.class

    /**
     * Constructor for appending an intermediate operation stage onto an
     * existing pipeline.
     *
     * @param previousStage the upstream pipeline stage
     * @param opFlags the operation flags for the new stage, described in
     * {@link StreamOpFlag}
     */
    AbstractPipeline(AbstractPipeline previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

Distinct

示例

Stream afterDistinct = afterFilter.distinct();

distinct的方法实现在类下

@Override
public final Stream distinct() {
    return DistinctOps.makeRef(this);
}

调用DistinctOps类的makeRef()方法,返回一个StatefulOp类,并重写了4个方法,实现逻辑在opWrapSink()中:

    /**
     * Appends a "distinct" operation to the provided stream, and returns the
     * new stream.
     *
     * @param  the type of both input and output elements
     * @param upstream a reference stream with element type T
     * @return the new stream
     */
    static  ReferencePipeline makeRef(AbstractPipeline upstream) {
        // 返回一个StatefulOp类
        return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE,
                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {

            // 重写了以下几个方法,内容省略...
             Node reduce(PipelineHelper helper, Spliterator spliterator) {...}

            @Override
             Node opEvaluateParallel(PipelineHelper helper,
                                              Spliterator spliterator,
                                              IntFunction generator) {...}

            @Override
             Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) {...}

            @Override
            Sink opWrapSink(int flags, Sink sink) {
                Objects.requireNonNull(sink);

                if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                    ...
                } else if (StreamOpFlag.SORTED.isKnown(flags)) {
                    ...
                } else {
                    // 返回一个SinkChainedReference类
                    return new Sink.ChainedReference(sink) {
                        //使用一个Set缓存数据,进行去重
                        Set seen;

                        //当上游通知begin的时候,初始化Set
                        @Override
                        public void begin(long size) {
                            seen = new HashSet<>();
                            downstream.begin(-1);
                        }

                        //略
                        @Override
                        public void end() {
                            seen = null;
                            downstream.end();
                        }

                        //如果已经存在,之间抛弃
                        @Override
                        public void accept(T t) {
                            if (!seen.contains(t)) {
                                seen.add(t);
                                downstream.accept(t);
                            }
                        }
                    };
                }
            }
        };
    }

StatefulOp类与StatelessOp类相似,都是继承了类,然后中间super()页会执行类的构造方法, 连接stage之间的关系

    /**
     * Base class for a stateful intermediate stage of a Stream.
     *
     * @param  type of elements in the upstream source
     * @param  type of elements in produced by this stage
     * @since 1.8
     */
    abstract static class StatefulOp extends ReferencePipeline {
     //省略
    }

至于其他的中间操作,套路都是类似的,操作逻辑封装在方法里, 可以慢慢的看。

疑问解答

终结操作

几个疑问

使用方式

列举四种终结操作,在Stream提供的API中,也是四类:

// 计算流中元素数量,FindOP
long count = afterLimit.count();

// 遍历所有元素,ForEachOp
afterLimit.forEach(System.out::printl);

// 获取第一个元素,MatchOp
Optional any = afterLimit.findFirst();

// 判断是否,ReduceOp
boolean flag = afterLimit.anyMatch(i -> i == 1);

count()

在类中实现

@Override
public final long count() {
   // 调用mapToLong将所有元素变成1,然后计算sum
    return mapToLong(e -> 1L).sum();
}

maoToLong()

mapToLong()方法,是一个中间操作

@Override
    public final LongStream mapToLong(ToLongFunction mapper) {
        Objects.requireNonNull(mapper);
        return new LongPipeline.StatelessOp(this, StreamShape.REFERENCE,
                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink opWrapSink(int flags, Sink sink) {
                return new Sink.ChainedReference(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        //
                        downstream.accept(mapper.applyAsLong(u));
                    }
                };
            }
        };
    }

ToLongFunction是一个函数式接口类, accept()里的逻辑便是.

@FunctionalInterface
public interface ToLongFunction {

    /**
     * Applies this function to the given argument.
     *
     * @param value the function argument
     * @return the function result
     */
    long applyAsLong(T value);
}

看下Sum()方法,在LongPipeline类中,传入参数是一个Long::sum, sum的作用是相加两个值。

@Override
public final long sum() {
    // use better algorithm to compensate for intermediate overflow?
    return reduce(0, Long::sum);
}

//public static long sum(long a, long b) {
//    return a + b;
//}

reduce()

reduce方法,将操作函数op封装成一个Sink,makeLong()的作用就是会生产一个Sink

@Override
public final long reduce(long identity, LongBinaryOperator op) {
    return evaluate(ReduceOps.makeLong(identity, op));
}

    /**
     * Constructs a {@code TerminalOp} that implements a functional reduce on
     * {@code long} values.
     *
     * @param identity the identity for the combining function
     * @param operator the combining function
     * @return a {@code TerminalOp} implementing the reduction
     */
    public static TerminalOp
    makeLong(long identity, LongBinaryOperator operator) {
        Objects.requireNonNull(operator);
        class ReducingSink
                implements AccumulatingSink, Sink.OfLong {
                
            //state是一个用作记录的值
            private long state;

            @Override
            public void begin(long size) {
                state = identity;
            }

      //参数传进来的就是sun(),所以这里的accept()的作用就是对state不断进行累加
            @Override
            public void accept(long t) {
                state = operator.applyAsLong(state, t);
            }

            @Override
            public Long get() {
                return state;
            }

            @Override
            public void combine(ReducingSink other) {
                accept(other.state);
            }
        }
        return new ReduceOp(StreamShape.LONG_VALUE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

evaluate()

看回evaluate()方法,这个方法用来执行终结操作的

final  R evaluate(TerminalOp terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    //判断流是否已被使用
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    //设置使用标记为true
    linkedOrConsumed = true;

    //根据流类型,执行相应的推断操作
    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

关注时序流的推断方法,可以看到这个方法的实现分为四种,对应上面提到的四类类型操作,count属于ReduceOp,进去看下。


@Override
public  R evaluateSequential(PipelineHelper helper,
                                   Spliterator spliterator) {
    //调用wrapAndCopyInto()方法
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

wrapAndCopyInto()

保证所有stage -> sink链表,然后执行copyInto()方法

  @Override
    final > S wrapAndCopyInto(S sink, Spliterator spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

warpSink()

就是在这里,从后向前,包装所有的stage阶段,形成一条sink链表。这样将之前一个个stage的链表结构包装成一个个Sink。

  @Override
    @SuppressWarnings("unchecked")
    final  Sink wrapSink(Sink sink) {
        Objects.requireNonNull(sink);
    
        //从后向前遍历
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            //执行每个opWrapSink()方法,这个方法在每个操作类中都进行了重写
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        //返回头sink
        return (Sink) sink;
    }

copyInto()

这个方法是整个流水线的启动开关,流程如下:

  @Override
    final  void copyInto(Sink wrappedSink, Spliterator spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            //通知第一个sink,做好准备接受流
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //执行
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

forEachRemaining()

在各个容器中都有实现forEachRemaining()这个方法,在ArrayList中:

public boolean tryAdvance(Consumer action) {
    if (action == null)
        throw new NullPointerException();
    int hi = getFence(), i = index;
    if (i < hi) {
        index = i + 1;
        @SuppressWarnings("unchecked") E e = (E)list.elementData[i];
        //执行accept()方法
        action.accept(e);
        if (list.modCount != expectedModCount)
            throw new ConcurrentModificationException();
        return true;
    }
    return false;
}

其他终结操作

forEach()

在类中,实现了forEach()方法,

// from ReferencePipeline.class

@Override
public void forEach(Consumer action) {
    // ForEachOps..
    evaluate(ForEachOps.makeRef(action, false));
}

evaluate后面的逻辑与count后面的一样了,略。

findFirst() anyMatch()

findFirst()和anyMatch()的逻辑也不再多说了,一个套路,看下实现

@Override
public final Optional findFirst() {
    return evaluate(FindOps.makeRef(true));
}

@Override
public final boolean anyMatch(Predicate predicate) {
    return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}

疑问解答

参考引用: