Java Stream 源码深入解析
类图

概念解释
Pipline和Stage
Pipline是流水线,表示一整个流程。Stage表示流水线的其中一个阶段。是一个比较抽象层面的描述,因为stage主要表示一种逻辑上的顺序关系,而具体每一个阶段要干嘛、怎么干,使用Sink来进行描述。
new stream //stage 0
.filter() //stage 1
.sort() //stage 2

Sink
直译为水槽,生活中水槽的作用无非
打开水龙头,知道有水要来
水在水槽里, 可以进行一些操作
打开水闸,放水Java中的Sink核心功能为:
begin(): 告诉该水槽水流要来了,可以进行一些初始化操作
accept():接受水流,然后进行操作
end():水流全部处理完了。看一个sort()的示例,sort这个stage的目的就是对所有水流进行排序,然后再流到下游。
private static final class SizedRefSortingSink extends AbstractRefSortingSink {
private T[] array; //要进行排序,需要一个数组进行缓存
private int offset;
SizedRefSortingSink(Sink super T> sink, Comparator super T> comparator) {
super(sink, comparator);
}
@Override
@SuppressWarnings("unchecked")
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
//上游调用begin(),通知sort进行初始化操作,生产一个数组
array = (T[]) new Object[(int) size];
}
//上游调用end()方法,告诉sort水已经全部流过来了。sort开始执行操作
@Override
public void end() {
//操作
Arrays.sort(array, 0, offset, comparator);
//告诉sort的下游准备接受水流
downstream.begin(offset);
//一个个元素的传递给下游
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
//告诉下游水流传递结束
downstream.end();
//缓存清空
array = null;
}
//上游调用accept()方法,将水流存储到到sort的缓存数组中
@Override
public void accept(T t) {
array[offset++] = t;
}
}
创建Head
疑问
官方说Stream不存储数据,那么数据保存在那里呢?解答在后面。
使用方式
可以使用Stream.of()创建一个流,例如
//创建方式 of()
Stream stream = Stream.of(1, 2, 3);
源码分析
of()方法调用
StreamSupport.stream(Arrays.spliterator(arr, 0, arr.length), false);
stream()方法逻辑:
public static Stream stream(Spliterator spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
调用了,返回一个Head对象。Head是ReferencePipeline的子类。可以理解为Head是流水线的第一个stage。构造方法的主要了逻辑要一直super()到类
/**
* The source spliterator. Only valid for the head pipeline.
* Before the pipeline is consumed if non-null then {@code sourceSupplier}
* must be null. After the pipeline is consumed if non-null then is set to
* null.
*/
private Spliterator> sourceSpliterator;
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
AbstractPipeline(Spliterator> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
//使用一个字段指向数据集合的Spliterator,后续终结操作的时候,引用的方式操作数据
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}

疑问解答
中间操作
几个疑问
使用方式
Stream st = headStream.filter(e-> e=1).distinct().sort();
//等同于
Stream afterFilter = headStream.filter(e -> e != 2);
Stream afterDistinct = afterFilter.distinct();
Stream afterSort = afterDistinct.sort();
Filter
执行filter(op)会发生什么?
Stream afterFilter = head.filter(e -> e = 1);
filter()方法定义在类,实现在类。
//ReferencePipeline.class
@Override
public final Stream filter(Predicate super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// 返回一个StatelessOp类
// 构造函数参数为(this,)
return new StatelessOp(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
返回一个类(因为filter是一个无状态操作),看下类,是一个静态抽象内部类,继承了类。
//ReferencePipeline.class
/**
* Base class for a stateless intermediate stage of a Stream.
*
* @param type of elements in the upstream source
* @param type of elements in produced by this stage
* @since 1.8
*/
abstract static class StatelessOp
extends ReferencePipeline {
/**
* Construct a new Stream by appending a stateless intermediate
* operation to an existing stream.
*
* @param upstream The upstream pipeline stage
* @param inputShape The stream shape for the upstream pipeline stage
* @param opFlags Operation flags for the new stage
*/
StatelessOp(AbstractPipeline, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
中间super()会执行类的构造方法, 连接stage之间的关系
//AbstractPipeline.class
/**
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}

Distinct
示例
Stream afterDistinct = afterFilter.distinct();
distinct的方法实现在类下
@Override
public final Stream distinct() {
return DistinctOps.makeRef(this);
}
调用DistinctOps类的makeRef()方法,返回一个StatefulOp类,并重写了4个方法,实现逻辑在opWrapSink()中:
/**
* Appends a "distinct" operation to the provided stream, and returns the
* new stream.
*
* @param the type of both input and output elements
* @param upstream a reference stream with element type T
* @return the new stream
*/
static ReferencePipeline makeRef(AbstractPipeline, T, ?> upstream) {
// 返回一个StatefulOp类
return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
// 重写了以下几个方法,内容省略...
Node reduce(PipelineHelper helper, Spliterator spliterator) {...}
@Override
Node opEvaluateParallel(PipelineHelper helper,
Spliterator spliterator,
IntFunction generator) {...}
@Override
Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) {...}
@Override
Sink opWrapSink(int flags, Sink sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
...
} else if (StreamOpFlag.SORTED.isKnown(flags)) {
...
} else {
// 返回一个SinkChainedReference类
return new Sink.ChainedReference(sink) {
//使用一个Set缓存数据,进行去重
Set seen;
//当上游通知begin的时候,初始化Set
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
//略
@Override
public void end() {
seen = null;
downstream.end();
}
//如果已经存在,之间抛弃
@Override
public void accept(T t) {
if (!seen.contains(t)) {
seen.add(t);
downstream.accept(t);
}
}
};
}
}
};
}
StatefulOp类与StatelessOp类相似,都是继承了类,然后中间super()页会执行类的构造方法, 连接stage之间的关系
/**
* Base class for a stateful intermediate stage of a Stream.
*
* @param type of elements in the upstream source
* @param type of elements in produced by this stage
* @since 1.8
*/
abstract static class StatefulOp extends ReferencePipeline {
//省略
}

至于其他的中间操作,套路都是类似的,操作逻辑封装在方法里, 可以慢慢的看。
疑问解答
终结操作
几个疑问
使用方式
列举四种终结操作,在Stream提供的API中,也是四类:

// 计算流中元素数量,FindOP
long count = afterLimit.count();
// 遍历所有元素,ForEachOp
afterLimit.forEach(System.out::printl);
// 获取第一个元素,MatchOp
Optional any = afterLimit.findFirst();
// 判断是否,ReduceOp
boolean flag = afterLimit.anyMatch(i -> i == 1);
count()
在类中实现
@Override
public final long count() {
// 调用mapToLong将所有元素变成1,然后计算sum
return mapToLong(e -> 1L).sum();
}
maoToLong()
mapToLong()方法,是一个中间操作
@Override
public final LongStream mapToLong(ToLongFunction super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new LongPipeline.StatelessOp(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {
@Override
public void accept(P_OUT u) {
//
downstream.accept(mapper.applyAsLong(u));
}
};
}
};
}
ToLongFunction是一个函数式接口类, accept()里的逻辑便是.
@FunctionalInterface
public interface ToLongFunction {
/**
* Applies this function to the given argument.
*
* @param value the function argument
* @return the function result
*/
long applyAsLong(T value);
}
看下Sum()方法,在LongPipeline类中,传入参数是一个Long::sum, sum的作用是相加两个值。
@Override
public final long sum() {
// use better algorithm to compensate for intermediate overflow?
return reduce(0, Long::sum);
}
//public static long sum(long a, long b) {
// return a + b;
//}
reduce()
reduce方法,将操作函数op封装成一个Sink,makeLong()的作用就是会生产一个Sink
@Override
public final long reduce(long identity, LongBinaryOperator op) {
return evaluate(ReduceOps.makeLong(identity, op));
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code long} values.
*
* @param identity the identity for the combining function
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static TerminalOp
makeLong(long identity, LongBinaryOperator operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink, Sink.OfLong {
//state是一个用作记录的值
private long state;
@Override
public void begin(long size) {
state = identity;
}
//参数传进来的就是sun(),所以这里的accept()的作用就是对state不断进行累加
@Override
public void accept(long t) {
state = operator.applyAsLong(state, t);
}
@Override
public Long get() {
return state;
}
@Override
public void combine(ReducingSink other) {
accept(other.state);
}
}
return new ReduceOp(StreamShape.LONG_VALUE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
evaluate()
看回evaluate()方法,这个方法用来执行终结操作的
final R evaluate(TerminalOp terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
//判断流是否已被使用
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
//设置使用标记为true
linkedOrConsumed = true;
//根据流类型,执行相应的推断操作
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
关注时序流的推断方法,可以看到这个方法的实现分为四种,对应上面提到的四类类型操作,count属于ReduceOp,进去看下。

@Override
public R evaluateSequential(PipelineHelper helper,
Spliterator spliterator) {
//调用wrapAndCopyInto()方法
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
wrapAndCopyInto()
保证所有stage -> sink链表,然后执行copyInto()方法
@Override
final > S wrapAndCopyInto(S sink, Spliterator spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
warpSink()
就是在这里,从后向前,包装所有的stage阶段,形成一条sink链表。这样将之前一个个stage的链表结构包装成一个个Sink。
@Override
@SuppressWarnings("unchecked")
final Sink wrapSink(Sink sink) {
Objects.requireNonNull(sink);
//从后向前遍历
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
//执行每个opWrapSink()方法,这个方法在每个操作类中都进行了重写
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
//返回头sink
return (Sink) sink;
}
copyInto()
这个方法是整个流水线的启动开关,流程如下:
@Override
final void copyInto(Sink wrappedSink, Spliterator spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
//通知第一个sink,做好准备接受流
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//执行
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
forEachRemaining()
在各个容器中都有实现forEachRemaining()这个方法,在ArrayList中:
public boolean tryAdvance(Consumer super E> action) {
if (action == null)
throw new NullPointerException();
int hi = getFence(), i = index;
if (i < hi) {
index = i + 1;
@SuppressWarnings("unchecked") E e = (E)list.elementData[i];
//执行accept()方法
action.accept(e);
if (list.modCount != expectedModCount)
throw new ConcurrentModificationException();
return true;
}
return false;
}
其他终结操作
forEach()
在类中,实现了forEach()方法,
// from ReferencePipeline.class
@Override
public void forEach(Consumer super P_OUT> action) {
// ForEachOps..
evaluate(ForEachOps.makeRef(action, false));
}
evaluate后面的逻辑与count后面的一样了,略。
findFirst() anyMatch()
findFirst()和anyMatch()的逻辑也不再多说了,一个套路,看下实现
@Override
public final Optional findFirst() {
return evaluate(FindOps.makeRef(true));
}
@Override
public final boolean anyMatch(Predicate super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
疑问解答
参考引用: