好文推荐:JAVA进阶之Stream实现原理

2022-12-27 0 1,017

1、操作方式进行分类

好文推荐:JAVA进阶之Stream实现原理

Stream中的操作方式能分成两类:尾端操作方式(Intermediate operations)与完结操作方式(Terminal operations),尾端操作方式而已对操作方式展开了历史记录,多于完结操作才会促发前述的排序(即胶体解释器),这也是Stream在插值大子集时高效率的其原因众所周知。尾端操作方式又能分成无状况(Stateless)操作方式与有状况(Stateful)操作方式,前者是指原素的处置不受以后原素的负面影响;前者是指本操作方式多于领到大部份原素后就可以延续下去。完结操作方式又能分成漏电(short-circuiting)或非漏电操作方式,那个如果较好认知,前者是指碰到这类具备条件的原素就能获得最终结论;而前者是指要处置大部份原素就可以获得最终结论。

或许要展开这般精巧的分割,其原因在于下层对每一类情形的处置方式相同。

好文推荐:JAVA进阶之Stream实现原理

BaseStream表述了流的插值、博戈达、以太网等基本上优点;Stream中表述了map、filter、flatmap等使用者高度关注的常见操作方式;PipelineHelper用作继续执行管线过程中将的操作方式和捕捉输入类别、博戈达度等重要信息Head、StatelessOp、StatefulOp为ReferencePipeline中的外部常量,用作叙述流的操作方式期。

2、源代码预测

上看两个范例:

Listlist=Arrays.asList(“China”,”America”,”Russia”,”Britain”);Listresult=list.stream().filter(e->e.length()>=4).map(e->e.charAt()).map(e->String.valueOf(e)).collect(Collectors.toList());

下面List具体来说聚合了两个stream,接着历经filter、map、四次无状况的尾端操作方式,最终由最终操作方式collect前半段。

下面通过源代码来一次庖丁解牛,看看一步步到底是怎么同时实现的。

聚合流的操作方式是通过调用StreamSupport类下面的方法同时实现的:

2.1 Stream()

publicstaticStreamstream(Spliteratorspliterator, booleanparallel){Objects.requireNonNull(spliterator);returnnewReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);}

方法很简单,直接new了两个ReferencePipeline.Head对象并返回。Head是ReferencePipeline的常量,而是Stream的常量。也就是说,返回了两个由同时实现的。

追溯源代码能发现,最终通过调用父类的构造方法完成实例化:

publicstaticStreamstream(Spliteratorspliterator, booleanparallel){Objects.requireNonNull(spliterator);//返回了两个由Head同时实现的Stream,三个参数分别代表流的数据源、优点组合、是否博戈达returnnewReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);}AbstractPipeline(Spliteratorsource, intsourceFlags, booleanparallel){this.previousStage=null;//上两个stage指向nullthis.sourceSpliterator=source;this.sourceStage=this;//源头stage指向自己this.sourceOrOpFlags=sourceFlags&StreamOpFlag.STREAMMASK;// The following is an optimization of:// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIALOPSVALUE);this.combinedFlags=(~(sourceOrOpFlags<<1))&StreamOpFlag.INITIALOPSVALUE;this.depth=;this.parallel=parallel;}

AbstractPipeline类中表述了三个称为“stage”外部变量:

/*** Backlink to the head of the pipeline chain (self if this is the source* stage).*/@SuppressWarnings(“rawtypes”)privatefinalAbstractPipelinesourceStage;/*** The “upstream” pipeline, or null if this is the source stage.*/()privatefinalAbstractPipelinepreviousStage;/*** The next stage in the pipeline, or null if this is the last stage.* Effectively final at the point of linking to the next pipeline.*/()privateAbstractPipelinenextStage;

当前节点同时持有前两个节点与后两个节点的指针,并且保留了头结点的引用,这不是典型的双端链表吗?

基于此,预测下面的构造函数:

前两个节点为空头结点指向自己后两个节点暂时未指定

很显然,构造出的是两个双端列表的头结点。

综上所述,stream函数返回了两个由类同时实现的管线流,且该管线流为两个双端链表的头结点

2.2 filter()

再上看第二步,filter操作方式,具体同时实现在的如下方法:

publicfinalStreamfilter(Predicatepredicate){//入参不能为空Objects.requireNonNull(predicate);//构建了两个StatelessOp对象,即无状况的尾端操作方式returnnewStatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOTSIZED){//覆写了父类的两个方法opWrapSink@OverrideSinkopWrapSink(intflags, Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidbegin(longsize){downstream.begin(-1);}@Overridepublicvoidaccept(POUTu){if (predicate.test(u))downstream.accept(u);} };} };}

StatelessOp与一样,也是的外部常量,同样通过调用父类的构造方法完成实例化,注意第两个参数,传入的是this,就是将上一步创建的对象传入,作为的previousStage。

AbstractPipeline(AbstractPipelinepreviousStage, intopFlags){if (previousStage.linkedOrConsumed)thrownewIllegalStateException(MSGSTREAMLINKED);previousStage.linkedOrConsumed=true;previousStage.nextStage=this;//前两个stage指向自己this.previousStage=previousStage;//自己指向前两个stagethis.sourceOrOpFlags=opFlags&StreamOpFlag.OPMASK;this.combinedFlags=StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);this.sourceStage=previousStage.sourceStage;//也保留了头结点的引用if (opIsStateful())sourceStage.sourceAnyStateful=true;this.depth=previousStage.depth+1;}

filter操作方式成为了双端链表的第二环。

值得注意的是,构造StatelessOp时,覆写了父类的两个方法opWrapSink,返回了两个Sink对象,作用暂时未知,猜测后面的操作方式如果会用到。

2.3 map()

再上看接下来的map操作方式:

@Override@SuppressWarnings(“unchecked”)publicfinalStreammap(Functionmapper){Objects.requireNonNull(mapper);returnnewStatelessOp(this, StreamShape.REFERENCE,StreamOpFlag.NOTSORTEDStreamOpFlag.NOTDISTINCT){@OverrideSinkopWrapSink(intflags, Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidaccept(POUTu){downstream.accept(mapper.apply(u));} };} };}

与filter类似,构造了两个StatelessOp对象,追加到双端列表中的末尾。

相同的地方在于opWrapSink方法的同时实现,继续猜测,通过覆写opWrapSink,如果能负面影响管线流的流程,同时实现定制化的操作方式。

调用一系列操作方式后会形成如下所示的双链表结构:

好文推荐:JAVA进阶之Stream实现原理

2.4 collect()

最终上看collect操作方式,相同于filter与map,collect为完结操作方式,肯定有特殊之。

@Override@SuppressWarnings(“unchecked”)publicfinalRcollect(Collectorcollector){Acontainer;if (isParallel()&&(collector.characteristics().contains(Collector.Characteristics.CONCURRENT))&&(!isOrdered() collector.characteristics().contains(Collector.Characteristics.UNORDERED))){container=collector.supplier().get();BiConsumeraccumulator=collector.accumulator();forEach(u->accumulator.accept(container, u));} else {//以太网模式container=evaluate(ReduceOps.makeRef(collector));// evaluate促发}returncollector.characteristics().contains(Collector.Characteristics.IDENTITYFINISH)? (R) container : collector.finisher().apply(container);}

ReduceOps.makeRef(collector)会构造两个TerminalOp对象,传入evaluate方法,追溯源代码,发现最终是调用copyInto方法来启动流水线:

@OverridefinalvoidcopyInto(SinkwrappedSink, Spliteratorspliterator){Objects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORTCIRCUIT.isKnown(getStreamAndOpFlags())){ //无漏电操作方式wrappedSink.begin(spliterator.getExactSizeIfKnown());//通知开始遍历spliterator.forEachRemaining(wrappedSink);//依次处置每个原素wrappedSink.end();//通知完结遍历} else {//有漏电操作方式copyIntoWithCancel(wrappedSink, spliterator);} }

该方法从数据源Spliterat

前面的filter、map操作方式而已做了一系列的准备工作,并没有继续执行,真正的插值是由完结操作方式collect来促发的。

2.5 Sink

Stream中使用Stage的概念来叙述两个完整的操作方式,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。

stage而已解决了操作方式历史记录的问题,要想让流水线起到应有的作用我们需要一类将所有操作方式叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次继续执行每一步的操作方式(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底继续执行了哪种操作方式,和回调函数是哪种形式。换句话说,多于当前Stage本身才知道该如何继续执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。

继续从源代码找答案。

filter、map源代码中,都覆写了两个名为opWrapSink的方法,该方法会返回两个 Sink 对象,而collect正是通过 Sink 来处置过程中将的数据。种种迹象表明,那个名为 Sink 的类在流的处置流程当中扮演了极其重要的角色。

interfaceSinkextendsConsumer{//开始遍历原素以后调用该方法,通知Sink做好准备,size代表要处置的原素总数,如果传入-1代表总数未知或者无限defaultvoidbegin(longsize){}//大部份原素遍历完成后调用,通知Sink没有更多的原素了。defaultvoidend(){}//如果返回true,代表那个Sink不再接收任何数据defaultbooleancancellationRequested(){returnfalse;}//还有两个继承自Consumer的方法,用作接收管线过程中将的数据//void accept(T t);…}

collect操作方式在调用copyInto方法时,传入了两个名为wrappedSink的参数,就是两个 Sink 对象,由AbstractPipeline.wrapSink方法构造:

@Override@SuppressWarnings(“unchecked”)finalSinkwrapSink(Sinksink){Objects.requireNonNull(sink);for (@SuppressWarnings(“rawtypes”)AbstractPipelinep=AbstractPipeline.this; p.depth>; p=p.previousStage){//自本身stage开始,不断调用前两个stage的opWrapSink,直到头节点sink=p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink) sink;}

opWrapSink()方法的作用是将当前操作方式与下游 Sink 结合成新 Sink ,试想,只要从流水线的最终两个Stage开始,不断调用上两个Stage的opWrapSink()方法直到头节点,就能获得两个代表了流水线上大部份操作方式的 Sink。

而那个opWrapSink方法不就是前面filter、map源代码中一直很神秘的未知操作方式吗?

至此,任督二脉打通,豁然开朗!

有了下面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作方式封装到两个Sink里,前两个Stage只需调用后两个Stage的accept()方法即可,并不需要知道其外部是如何处置的。当然对于有状况的操作方式,Sink的begin()和end()方法也是要同时实现的。比如Stream.sorted()是两个有状况的尾端操作方式,其对应的Sink.begin()方法可能会创建两个盛放结论的容器,而accept()方法负责将原素添加到该容器,最终end()负责对容器展开排序。对于漏电操作方式,Sink.cancellationRequested()也是要同时实现的,比如Stream.findFirst()是漏电操作,只要找到两个原素,cancellationRequested()就如果返回true,以便调用者尽快完结查找。Sink的四个接口方法常常相互协作,共同完成排序任务。前述上Stream API外部同时实现的的本质,就是如何重载Sink的这四个接口方法。

好文推荐:JAVA进阶之Stream实现原理

有了Sink对操作方式的包装,Stage之间的调用问题就解决了,继续执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能了。一类可能的Sink.accept()方法流程是这样的:

voidaccept(Uu){1.使用当前Sink包装的回调函数处置u2.将处置结论传递给流水线下游的Sink}

Sink接口的其他几个方法也是按照这种[处置->转发]的模型同时实现。下面我们结合具体范例看看Stream的尾端操作方式是如何将自身的操作方式包装成Sink和Sink是如何将处置结论转发给下两个Sink的。先看Stream.map()方法:

// Stream.map(),调用该方法将产生两个新的StreampublicfinalStreammap(Functionmapper){//…returnnewStatelessOp(this, StreamShape.REFERENCE,StreamOpFlag.NOTSORTEDStreamOpFlag.NOTDISTINCT){@Override/* opWripSink()方法返回由回调函数包装而成Sink */SinkopWrapSink(intflags, Sinkdownstream){returnnewSink.ChainedReference(downstream){@Overridepublicvoidaccept(POUTu){Rr=mapper.apply(u);//1.使用当前Sink包装的回调函数mapper处置udownstream.accept(r);//2.将处置结论传递给流水线下游的Sink }};} };}

上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到两个Sink当中。由于Stream.map()是两个无状况的尾端操作方式,所以map()方法返回了两个StatelessOp外部类对象(两个新的Stream),调用那个新Stream的opWripSink()方法将获得两个包装了当前回调函数的Sink。

再上看两个复杂一点的范例。Stream.sorted()方法将对Stream中的原素展开排序,显然这是两个有状况的尾端操作,因为读取大部份原素以后是没法获得最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作方式封装成Sink的呢?sorted()一类可能封装的Sink代码如下:

// Stream.sort()方法用到的Sink同时实现classRefSortingSinkextendsAbstractRefSortingSink{privateArrayListlist;//存放用作排序的原素RefSortingSink(Sinkdownstream, Comparatorcomparator){super(downstream, comparator);}@Overridepublicvoidbegin(longsize){ …//创建两个存放排序原素的列表list=(size>=)?newArrayList((int) size): newArrayList();}@Overridepublicvoidend(){list.sort(comparator);//多于原素全部接收后就可以开始排序downstream.begin(list.size());if (!cancellationWasRequested){//下游Sink不包含漏电操作方式list.forEach(downstream::accept);//2.将处置结论传递给流水线下游的Sink } else {//下游Sink包含漏电操作方式for (Tt : list){//每次都调用cancellationRequested()询问是否能完结处置。if (downstream.cancellationRequested())break;downstream.accept(t);//2.将处置结论传递给流水线下游的Sink }}downstream.end();list=null;}@Overridepublicvoidaccept(Tt){list.add(t);//1.使用当前Sink包装动作处置t,而已简单的将原素添加到尾端列表当中} }

上述代码完美的展现了Sink的四个接口方法是如何协同工作的:

具体来说beging()方法告诉Sink参与排序的原素个数,方便确定尾端结论容器的的大小;后通过accept()方法将原素添加到尾端结论当中,最终继续执行时调用者会不断调用该方法,直到遍历大部份原素;最终end()方法告诉Sink大部份原素遍历完毕,启动排序步骤,排序完成后将结论传递给下游的Sink;如果下游的Sink是漏电操作方式,将结论传递给下游时不断询问下游cancellationRequested()是否能完结处置。

3、结论收集

最终两个问题是流水线上大部份操作方式都继续执行后,使用者所需要的结论(如果有)在哪里?具体来说要说明的是不是大部份的Stream完结操作方式都需要返回结论,有些操作方式而已为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结论打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结论的完结操作方式结论存在哪里呢?

特别说明:副作用不如果被滥用,也许你会觉得在Stream.forEach()里展开原素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会博戈达继续执行。大多数使用副作用的地方都能使用归约操作方式更安全和有效的完成。

//错误的收集方式ArrayListresults=newArrayList<>();stream.filter(s->pattern.matcher(s).matches()).forEach(s->results.add(s));// Unnecessary use of// side-effects!//正确的收集方式Listresults=stream.filter(s->pattern.matcher(s).matches()).collect(Collectors.toList());// No side-effects!

回到流水线继续执行结论的问题上来,需要返回结论的流水线结论存在哪里呢?这要分相同的情形讨论,下表给出了各种有返回结论的Stream完结操作方式。

对于表中返回boolean或者Optional的操作方式的操作方式,由于值返回两个值,只需要在对应的Sink中历史记录那个值,等到继续执行完结时返回就能了。对于归约操作方式,最终结论放在使用者调用时指定的容器中(容器类别通过收集器指定)。collect(),reduce(),max(),min()都是归约操作方式,虽然max()和min()也是返回两个Optional,但事实上下层是通过调用reduce()方法同时实现的。对于返回是数组的情形,在最终返回数组以后,结论其实是存储在一类叫做Node的数据结构中的。Node是一类多叉树结构,原素存储在树的叶子当中,并且两个叶子节点能存放多个原素。这样做是为了博戈达继续执行方便。

4、博戈达流

如果将下面的范例改为如下形式,管线流将会以博戈达模式处置数据:

List list = Arrays.asList(“China”,”America”,”Russia”,”Britain”); List result = list.stream().parallel().filter(e -> e.length()>=4).map(e -> e.charAt(0)).map(e -> String.valueOf(e)).collect(Collectors.toList());

parallel()方法的同时实现很简单,而已将源stage的博戈达标记只为true:

@Override@SuppressWarnings(“unchecked”)publicfinalSparallel(){sourceStage.parallel=true;return (S) this;}

在完结操作方式通过evaluate方法启动管线流时,会根据博戈达标记来判断:

finalRevaluate(TerminalOpterminalOp){assertgetOutputShape()==terminalOp.inputShape();if (linkedOrConsumed)thrownewIllegalStateException(MSGSTREAMLINKED);linkedOrConsumed=true;returnisParallel()?terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

collect操作方式会通过ReduceTask来继续执行并发任务:

@OverridepublicRevaluateParallel(PipelineHelperhelper, Spliteratorspliterator){returnnewReduceTask<>(this, helper, spliterator).invoke().get();}

ReduceTask是ForkJoinTask的常量,其实Stream的博戈达处置都是基于Fork/Join框架的,相关类与接口的结构如下图所示:

好文推荐:JAVA进阶之Stream实现原理

fork/join框架是jdk1.7引入的,能以递归方式将博戈达的任务拆分成更小的任务,接着将每个子任务的结论合并起来聚合整体结论。它是ExecutorService接口的两个同时实现,它把子任务分配线程池(ForkJoinPool)中的工作线程。要把任务提交到那个线程池,要创建RecursiveTask的两个常量,如果任务不返回结论则是RecursiveAction的常量。

好文推荐:JAVA进阶之Stream实现原理

对于ReduceTask来说,任务分解的实现表述在其父类AbstractTask的compute()方法当中:

@Overridepublicvoidcompute(){Spliteratorrs=spliterator, ls;// right, left spliteratorslongsizeEstimate=rs.estimateSize();longsizeThreshold=getTargetSize(sizeEstimate);booleanforkRight=false;@SuppressWarnings(“unchecked”)Ktask=(K) this;while (sizeEstimate>sizeThreshold&&(ls=rs.trySplit())!=null){KleftChild, rightChild, taskToFork;task.leftChild=leftChild=task.makeChild(ls);task.rightChild=rightChild=task.makeChild(rs);task.setPendingCount(1);if (forkRight){forkRight=false;rs=ls;task=leftChild;taskToFork=rightChild;} else {forkRight=true;task=rightChild;taskToFork=leftChild;}taskToFork.fork();sizeEstimate=rs.estimateSize();}task.setLocalResult(task.doLeaf());task.tryComplete();}

主要逻辑如下:

量已经小于那个阈值的时候展开排序,否则展开fork 将任务分割成更小的数据块,展开求解。

值得注意的是,这里面有个很重要的参数,用来判断是否需要继续分割成更小的子任务,默认为parallelism*4,parallelism是并发度的意思,默认值为cpu 数1,能通过java.util.concurrent.ForkJoinPool.common.parallelism设置,如果当前分片大小仍然大于处置数据单元的阈值,且分片继续尝试切分成功,那么就继续切分,分别将左右分片的任务创建为新的Task,并且将当前的任务关联为两个新任务的父级任务(逻辑在makeChild 里面)。

先后对左右子节点的任务展开fork,对另外的分区展开分解。同时设定pending 为1,这代表两个task 前述上只会有两个等待的子节点(被fork)。

当任务已经分解到足够小的时候退出循环,尝试展开完结。调用常量同时实现的doLeaf方法,完成最小排序单元的排序任务,并设置到当前任务的localResult中。

接着调用tryComplete方法展开最终任务的扫尾工作,如果该任务pending 值不等于0,则原子的减1,如果已经等于0,说明任务都已经完成,则调用onCompletion 回调,如果该任务是叶子任务,则直接销毁尾端数据完结;如果是尾端节点会将左右子节点的结论展开合并。

最终检查那个任务是否还有父级任务了,如果没有则将该任务置为正常完结,如果还有则尝试递归的去调用父级节点的onCompletion回调,逐级展开任务的合并。

publicfinalvoidtryComplete(){CountedCompletera=this, s=a;for (intc;;){if ((c=a.pending)==){a.onCompletion(s);if ((a=(s=a).completer)==null){s.quietlyComplete();return;} } elseif (U.compareAndSwapInt(a, PENDING, c, c-1))return;} }

博戈达流的同时实现本质上就是在ForkJoin上展开了一层封装,将Stream 不断尝试分解成更小的split,接着使用fork/join 框架分而治之。

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务