jdk-Futeure&ForkJoin框架源码学习

2023-05-28 0 470

大背景

日常生活的排序各项任务绝大部分都是以太网来继续执行,但是如果两个繁杂的各项任务须要展开拆分成数个小各项任务,那么过往是另行写两个递回或是循环式排序等演算法来实现,随着这类市场需求的提高,java7中导入了ForkJoin架构来支持这类排序,能较为高效率的化解大各项任务或须要分拆排序的一些情景须要。

ForkJoin架构如是说

jdk-Futeure&ForkJoin框架源码学习

Java的Fork-Join架构是一类虚拟化博戈达计算各项任务的架构,一般来说被用作处置CPU专门化各项任务。它是Java SE 7及更高版中提供的一类C#,意在协助开发人员更随心所欲地撰写可扩充的博戈达标识符。

Fork-Join架构的核心理念思想是“共管演算法”,将两个大的各项任务拆分成二个小的子各项任务,接着通过递回的形式对那些子各项任务展开处置,最后将它的结论分拆起来得到大各项任务的结论。由于每一子各项任务都能分立继续执行,因此能借助数个处置器和多核心理念CPU的竞争优势来博戈达计算那些子各项任务,进而提高整座各项任务的处置速度。

在采用Fork-Join架构时,须要表述两个承继自RecursiveTask或RecursiveAction的各项任务,接着采用ForkJoinPool.submit()形式将各项任务递交给Fork-Join缓存池来继续执行。其中RecursiveTask回到各项任务的结论,RecursiveAction则不回到结论。

Fork-Join架构不但功能强悍、单纯,而且能智能化地借助多处置器和多核心理念CPU,充分利用Xen的竞争优势。同时,它也有许多其他的技术细节和操控性强化举措,比如说工作盗取演算法等,使它成为处置博戈达各项任务的一类强悍辅助工具。

提议参照:https://www.oracle.com/technical-resources/articles/java/fork-join.html

CPU专门化 与 IO专门化的差别?

    CPU专门化和IO专门化指的是不同类别的排序各项任务,其差别主要表现在对计算资源的借助上。

CPU专门化各项任务(CPU-bound)是指须要大量CPU运算来完成的各项任务,例如繁杂的数学排序、图像处置、科学模拟等。在这种情况下,系统的硬盘、内存操控性相对CPU要好很多,此时往往是CPU Loading 100%,而I/O操作能在很短时间内完成,所以CPU占用率很高,而I/O等待时间很短,因此CPU并不须要等待I/O操作的完成。

IO专门化各项任务(I/O-bound)则是指须要大量I/O操作来完成的各项任务,例如文件读写、网络传输、数据库查询等。在这种情况下,系统的CPU操控性相对硬盘和内存要好很多,此时往往就是CPU在等待I/O操作(比如说硬盘读写)的完成,而CPU Loading并不高。在I/O专门化各项任务中,绝大部分时间会用来等待I/O操作的完成,而CPU占用率则相对较低。在实际应用中,我们须要根据各项任务类别的不同选择合适的排序机配置和演算法强化策略。如果是CPU专门化各项任务,就须要选用配置强劲的CPU,并尽可能缩短I/O等待时间;而如果是I/O专门化各项任务,就须要选用配置高速的硬盘和网络设备,并尽可能合理地借助CPU资源。

注意:python语言由于是解释型语言,对CPU专门化各项任务全力支持不是很好,因为python全局解释器(Global Interpreter Lock,GIL)同一时刻只有两个缓存能够继续执行phton字节码,哪怕有虚拟化,同一时刻的继续执行也不能同时继续执行python字节码。(就是以太网),能参照:https://blog.csdn.net/qq_44993593/article/details/129120146

并发与博戈达有什么差别?

    博戈达(Parallel):博戈达是在同一时刻继续执行数个各项任务,每一各项任务都有自己的处置器或核心理念来分立继续执行。

    并发(Concurrent):并发是在同一时间段内继续执行数个各项任务,那些各项任务会交替采用CPU时间片来继续执行,让用户感觉它在同时运行。

个人理解:例如每次过年,拜年的时候,同一天亲戚分批次来你家拜年中间互相没有遇到(并发),亲戚全部一起来到你家里拜年(博戈达)。

jdk-Futeure&ForkJoin框架源码学习

工作盗取演算法是什么?主要化解什么问题?

工作盗取演算法(work-stealing Algorithm)是一类用作实现各项任务调度的并发编程演算法。该算

原理:工作盗取演算法是基于双端队列(deque)的,每一缓存都有两个自己的工作队列,其中各项任务的继续执行顺序是先进先出(FIFO)。当缓存须要继续执行各项任务时,会从自己的队列中取出最后加入的各项任务展开处置。如果缓存的队列为空,那么它就会去其他缓存的工作队列中盗取两个各项任务来继续执行,盗取的各项任务一般是其他缓存队列的开头(队首)各项任务,这样能有效减少缓存之间的竞争和锁竞争,提高博戈达计算速度。

优点 :

能够高效率地借助CPU资源;

充分发掘多核处置器的操控性;

同时也能够避免缓存因等待某些各项任务而空闲;

缺点:

各项任务盗取的次数越多,缓存之间的负载均衡就越难以保证;

此外,在各项任务博戈达度较低的情况下,各项任务盗取可能会增加一些额外的开销,降低程序的操控性。

jdk-Futeure&ForkJoin框架源码学习

工作盗取演算法的实现逻辑主要分成以下几步:

每一缓存都有两个工作队列,其中各项任务按照先进先出(FIFO)的顺序继续执行。

当缓存须要继续执行各项任务时,它会从自己的队列末尾取出最后加入的各项任务展开处置。

如果缓存的队列为空,那么它会去其他缓存的工作队列中盗取两个各项任务来继续执行。这里须要注意的是,缓存须要选择两个合适的盗取对象,以确保各项任务盗取的负载均衡性。

盗取各项任务一般是从其他缓存队列的开头(队首)展开。当缓存成功地从其他缓存队列盗取了各项任务时,它会立即继续执行该各项任务。

在各项任务继续执行过程中,缓存可能会生成新的子各项任务。那些子各项任务会被放到缓存的本地工作队列中,而不是直接发送到其他缓存的队列中,以避免锁竞争。

缓存在继续执行各项任务过程中,会不断地检查自己的本地队列和其他缓存的队列,以保证各项任务的高效率继续执行和负载均衡性。

当所有各项任务都完成后,缓存退出。

ForkJoin基本采用

类图

jdk-Futeure&ForkJoin框架源码学习

类说明:

ForkJoinTask 是 Java 并发编程中的两个类,它是基于 “分而治之” 的思想,并结合了 “工作盗取” 演算法的一类博戈达计算架构。

在 java.util.concurrent 包中,ForkJoinTask 是两个抽象类,须要通过承继来创建具体的各项任务。它能分割出子各项任务,借助虚拟化博戈达地继续执行那些各项任务,并将结论分拆到一起,最后得到整座各项任务的结论。

其中,常用的子类包括:=

RecursiveAction:用作没有回到结论的各项任务。

RecursiveTask:用作有回到结论的各项任务。

在继续执行各项任务时,ForkJoinPool会将各项任务拆分成数个小各项任务,每一缓存继续执行其中的两个小各项任务,当缓存继续执行完自己的各项任务后,它能去 “盗取” 其他缓存队列中的各项任务来继续执行,以此提高 CPU 采用率和并发效率。

相较为于传统的虚拟化编程,采用 ForkJoinTask 能更好地借助 CPU,减少缓存间的竞争,提高程序的操控性。

/**

 *

 * 功能描述:

 *

 * @param

: 通过fork/join 展开求合排序

 * @return

:

 * @auther

: csh

 * @date

: 2023/4/17 9:52 下午

 */
public class ArraySum extends RecursiveTask<Long>

{

    private static final int THRESHOLD = 1000; // 阈值,当数组大小小于该值时不再展开拆分    private long

[] arr;

    private int

 start;

    private int

 end;

    public ArraySum(long[] arr, int start, int end) 

{

        this

.arr = arr;

        this

.start = start;

        this

.end = end;

    }

    @Override    protected Long compute() 

{

        if (end – start <= THRESHOLD) { // 如果数组大小小于阈值,直接排序            long sum = 0L

;

            for (int

 i = start; i < end; i++) {

                sum += arr[i];

            }

            return

 sum;

        } else { // 如果数组大小大于阈值,拆分成两个各项任务并继续执行            intmid = (start + end) /2

;

            ArraySum left = new

 ArraySum(arr, start, mid);

            ArraySum right = new

 ArraySum(arr, mid, end);

left.fork();

            right.fork();

            return

 left.join() + right.join();

        }

    }

    public static void main(String[] args) 

{

        int N = 100000

;

        long[] arr = new long

[N];

        for (int i = 0

; i < N; i++) {

            arr[i] = i;

        }

        ForkJoinPool pool = new

ForkJoinPool();

        long sum = pool.invoke(new ArraySum(arr, 0

, arr.length));

        System.out.println(“累加起来的结论是: “

 + sum);

    }

}

结论

相加起来的结论是: 4999950000

fork/join的源标识符自学

ForkJoinTask 主要借助了 Unsafe 类的 CAS 操作实现了对各项任务状态的更新。在继续执行完成各项任务时,调用 onCompletion(CountedCompleter caller) 形式通知该各项任务的依赖各项任务或向等待该各项任务的缓存发送信号,以此实现对各项任务结论的分拆和传递。

属性:

static final int

REOPR_SIGNAL: 表示各项任务的初始状态,即等待被继续执行。

static final int

 DONE_MASK: 各项任务完成状态的标识。

static final int

 NORMAL: 各项任务正常完成。

static final int

CANCELLED: 各项任务被取消。

static final int

 EXCEPTIONAL: 各项任务发生异常。

volatile int

status: 表示各项任务的状态,取值可能为 REOPR_SIGNAL、NORMAL、CANCELLED 或 EXCEPTIONAL 中的两个。

volatile

 ForkJoinTask next: 指向下两个等待继续执行的各项任务。

volatile

Thread runner: 继续执行该各项任务的缓存。

final short

 statusFlags: 表示各项任务的状态及其他一些控制信息。

final short

 mode: 表示各项任务的运行模式。

Throwable exception: 各项任务继续执行过程中发生异常时,保存该异常对象。

形式:

public final void fork()

: 将该各项任务加入到当前工作缓存队列中,等待被继续执行。

public final boolean isDone()

: 判断该各项任务是否已完成。

public final boolean cancel(boolean mayInterruptIfRunning)

: 取消该各项任务的继续执行。

public final void completeExceptionally(Throwable ex)

: 异常完成该各项任务,并将发生的异常传递给等待该各项任务的缓存。

protected abstract void compute()

: 子类必须实现的排序形式,用作继续执行具体的各项任务逻辑。

public final void quietlyCompleteRoot()

: 安静地完成该各项任务,并通知等待该各项任务的缓存。如果该各项任务是根各项任务,则将结论放到 ForkJoinPool 中的队列中。

public final int getQueuedTaskCount() public final boolean isCancelled()

: 判断该各项任务是否已取消。

public final boolean isCompletedAbnormally()

: 判断该各项任务是否发生异常。

public final boolean isCompletedNormally()

: 判断该各项任务是否正常完成。

public final Throwable getException() public final ForkJoinTask<V> submit()

: 将该各项任务递交到 ForkJoinPool 中执行,并回到该各项任务的结论。

public final V invoke()

: 在当前缓存中继续执行该各项任务,并回到该各项任务的结论。

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)

: 继续执行给定的两个各项任务,并等待这两个各项任务都完成。

public static <T> void invokeAll(ForkJoinTask<T>… tasks)

: 继续执行指定的一组各项任务,并等待所有各项任务都完成。

protected static void reportException(Throwable ex)

: 抛出给定的异常。

私有形式:

final int setCompletion(int completion)

: 原子性地将该各项任务的状态修改为完成状态,同时回到原状态值。

final int doExec(): 继续执行当前各项任务的 compute()

 形式,并回到各项任务的状态值。

final boolean trySetSignal()

: 尝试将当前各项任务的状态从新建转换为信号状态 REOPR_SIGNAL。

static void internalPropagateException(Throwable ex): 尝试将给定的异常对象抛出到外层各项任务。
package

 java.util.concurrent;

import

 java.io.Serializable;

import

java.util.Collection;

import

 java.util.List;

import

 java.util.RandomAccess;

import

 java.lang.ref.WeakReference;

import

 java.lang.ref.ReferenceQueue;

import

 java.util.concurrent.Callable;

import

java.util.concurrent.CancellationException;

import

 java.util.concurrent.ExecutionException;

import

 java.util.concurrent.Future;

import

 java.util.concurrent.RejectedExecutionException;

import

 java.util.concurrent.RunnableFuture;

import

java.util.concurrent.TimeUnit;

import

 java.util.concurrent.TimeoutException;

import

java.util.concurrent.locks.ReentrantLock;

import

 java.lang.reflect.Constructor;

//为抽象类必须被实现,实现Futurepublic abstract class ForkJoinTask<V> implements Future<V>, Serializable 

{

    

/** 状态:初始状态:status = SIGNAL

           正常完成状态:status = NORMAL

取消状态:status = CANCELLED

           异常状态:status = EXCEPTIONAL */
    volatile int status; // accessed directly by pool and workers    static final int DONE_MASK = 0xf0000000; // 各项任务完成时的标识。    static final int NORMAL = 0xf0000000; // 正常各项任务状态。    static final int CANCELLED = 0xc0000000; // 取消状态。    static final int EXCEPTIONAL = 0x80000000; // 异常状态。    static final intSIGNAL =0x00010000; // 初始状态 必须为 >= 1 << 16    static final int SMASK = 0x0000ffff; // 低位掩码,也是最大索引位    

/**

* 原子性地将该各项任务的状态修改为完成状态,同时回到原状态值。

     入参为状态

     */
    private int setCompletion(int completion) 

{

        for (int

 s;;) {

            if ((s = status) < 0

)

                return

 s;

            if (U.compareAndSwapInt(this

, STATUS, s, s | completion)) {

                if ((s >>> 16) != 0

)

                    synchronized (this

) { notifyAll(); }

                return

 completion;

            }

        }

    }

    

/**

     *继续执行当前各项任务的 compute() 形式,并回到各项任务的状态值。

     */
    final int doExec() 

{

        int s; boolean

completed;

        //状态大于0        if ((s = status) >= 0

) {

            try

 {

                //立即继续执行各项任务

                completed = exec();

            } catch

 (Throwable rex) {

                return

setExceptionalCompletion(rex);

            }

            //继续执行后状态判断            if

 (completed)

                //设置状态为正常各项任务状态。

                s = setCompletion(NORMAL);

        }

        //回到状态        return

 s;

    }

    

/**

     * 在等待该各项任务完成时,采用指定的超时时间来阻塞当前缓存。

     */
    final void internalWait(long timeout) 

{

        int

 s;

        //正常状态才继续        if((s = status) >=0 && // force completer to issue notify            //又是Unsafe 的cas来更改状态            U.compareAndSwapInt(this

, STATUS, s, s | SIGNAL)) {

            //同步块            synchronized (this

) {

                //状态大于0 正常 等待timeout时间                if (status >= 0

)

                    try { wait(timeout); } catch

 (InterruptedException ie) { }

                else                    //唤醒所有各项任务

                    notifyAll();

            }

        }

    }

    

/**

     * 阻止非工作缓存,直到完成。

     */
    private int externalAwaitDone() 

{

                int s = ((this instanceof CountedCompleter) ? // try helping

ForkJoinPool.common.externalHelpComplete(

                     (CountedCompleter<?>)this, 0

) :

                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :0

);

        //大于0证明不是初始化及已结束        if (s >= 0 && (s = status) >= 0

) {

            boolean interrupted = false

;

            do

 {

                //通过循环式形式展开cas设置锁                if (U.compareAndSwapInt(this

, STATUS, s, s | SIGNAL)) {

                    synchronized (this

) {

                        if (status >= 0

) {

                            try

 {

                                wait(0L

);

                            } catch

 (InterruptedException ie) {

                                interrupted = true

;

                            }

                        }

                        else

                            notifyAll();

                    }

                }

            } while ((s = status) >= 0

);

            //中断状态为true(有可能结束或异常了)            if

 (interrupted)

                //展开当前缓存中断

                Thread.currentThread().interrupt();

        }

        //反回状态        return

 s;

    }

    

/**

* 等待该各项任务完成,并允许在等待的过程中中断当前缓存。

该形式通过调用 LockSupport.park(this) 形式来实现缓存等待,如果当前缓存被中断,则会抛出 InterruptedException 异常。

     */
    private int externalInterruptibleAwaitDone() throws InterruptedException

{

        int

 s;

        //中断缓存成功 直接抛出        if

(Thread.interrupted())

            throw new

 InterruptedException();

        //如果状态大于0 且 尝试通过缓存池展开继续执行所有各项任务 证明正常        if ((s = status) >= 0

 &&

            (s = ((this instanceof

 CountedCompleter) ?

                  ForkJoinPool.common.externalHelpComplete(

(CountedCompleter<?>)this, 0

) :

                  ForkJoinPool.common.tryExternalUnpush(this

) ? doExec() :

                  0)) >= 0

) {

            //循环式展开状态置为初始化            while((s = status) >=0

) {

                if (U.compareAndSwapInt(this

, STATUS, s, s | SIGNAL)) {

                    synchronized (this

) {

                        if(status >=0

)

                            wait(0L

);

                        else

                            notifyAll();

                    }

                }

            }

        }

        return

 s;

    }

    

/**

     * 等待该各项任务完成,并回到各项任务的状态。

     在等待各项任务完成的过程中,采用自旋锁的形式不断地检查各项任务状态。

如果各项任务状态为完成状态,则回到该各项任务的状态;否则,采用 LockSupport.park(this) 形式挂起当前缓存,并等待各项任务完成。

注意:这个形式在等待过程中由于采用了自旋锁和缓存挂起的形式,因此可能会消耗大量的 CPU 资源。

     *

     */
    private int doJoin() 

{

        //参数声名 状态:s 缓存:t 工作缓存:wt 工作队列:w        int

s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;

                return (s = status) < 0

 ? s :

((t = Thread.currentThread())instanceof

 ForkJoinWorkerThread) ?

            (w = (wt = (ForkJoinWorkerThread)t).workQueue).

tryUnpush(this) && (s = doExec()) < 0

 ? s :

            wt.pool.awaitJoin(w, this, 0L

) :

            externalAwaitDone();

    }

    

/**

* 在当前缓存中继续执行该各项任务,并回到各项任务的状态值(同上类似)

     */
    private int doInvoke() 

{

        int

 s; Thread t; ForkJoinWorkerThread wt;

        return (s = doExec()) < 0

 ? s :

            ((t = Thread.currentThread()) instanceof

 ForkJoinWorkerThread) ?

(wt = (ForkJoinWorkerThread)t).pool.

            awaitJoin(wt.workQueue, this, 0L

) :

            externalAwaitDone();

    }

    // Exception table support    //各项任务异常列表    private static final

 ExceptionNode[] exceptionTable;

    //各项任务异常重入锁    private static final

ReentrantLock exceptionTableLock;

    //存放异常的实例    private static final

 ReferenceQueue exceptionTableRefQueue;

    

/**

     * 异常表的固定容量。

     */
    private static final int EXCEPTION_MAP_CAPACITY = 32

;

    //异常节点类实现    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>

{

        final

 Throwable ex;

        ExceptionNode next;

        final long thrower; // use id not ref to avoid weak cycles        final int hashCode; // store task hashCode before weak ref disappears

        ExceptionNode(ForkJoinTask task, Throwable ex, ExceptionNode next) {

            super

(task, exceptionTableRefQueue);

            this

.ex = ex;

            this

.next = next;

            this

.thrower = Thread.currentThread().getId();

            this

.hashCode = System.identityHashCode(task);

        }

    }

    

/**

* 记录异常并设置状态。

     *

     * @return

 status on exit

     */
    final int recordExceptionalCompletion(Throwable ex) 

{

        int

 s;

        //状态大于等于0        if((s = status) >=0

) {

            int h = System.identityHashCode(this

);

            //上锁            final

ReentrantLock lock = exceptionTableLock;

            lock.lock();

            try

 {

                expungeStaleExceptions();

                ExceptionNode[] t = exceptionTable;

                int i = h & (t.length – 1

);

                //循环式创建                for

 (ExceptionNode e = t[i]; ; e = e.next) {

                    if (e == null

) {

                        t[i] = new ExceptionNode(this

, ex, t[i]);

                        break

;

                    }

                    if (e.get() == this) // already present                        break

;

                }

            } finally

 {

                lock.unlock();

            }

s = setCompletion(EXCEPTIONAL);

        }

        return

 s;

    }

    

/**

     * 记录异常并可能传播。

     *

     * @return

 status on exit

     */
    private int setExceptionalCompletion(Throwable ex) 

{

        int

 s = recordExceptionalCompletion(ex);

        if

 ((s & DONE_MASK) == EXCEPTIONAL)

internalPropagateException(ex);

        return

 s;

    }

    

/**

     * Hook for exception propagation support for tasks with completers.

     */
    void internalPropagateException(Throwable ex) 

{

    }

    

/**

     * 取消,忽略取消引发的任何异常。

     */
    static final void cancelIgnoringExceptions(ForkJoinTask<?> t) 

{

        if (t != null && t.status >= 0

) {

            try

 {

                //取消                t.cancel(false

);

            } catch

 (Throwable ignore) {

}

        }

    }

    

/**

     * 删除异常节点并清除状态。

     */
    private void clearExceptionalCompletion() 

{

        int h = System.identityHashCode(this

);

        final

 ReentrantLock lock = exceptionTableLock;

        //上锁

        lock.lock();

        try

 {

            ExceptionNode[] t = exceptionTable;

            int i = h & (t.length – 1

);

            ExceptionNode e = t[i];

            ExceptionNode pred = null

;

            //循环式处置            while (e != null

) {

ExceptionNode next = e.next;

                if (e.get() == this

) {

                    if (pred == null

)

                        t[i] = next;

                    else

                        pred.next = next;

                    break

;

                }

pred = e;

                e = next;

            }

            expungeStaleExceptions();

            status = 0

;

        } finally

 {

            //释放锁

            lock.unlock();

        }

    }

        privateThrowablegetThrowableException() 

{

        //不是异常直接回到空        if

 ((status & DONE_MASK) != EXCEPTIONAL)

            return null

;

                int h = System.identityHashCode(this

);

        ExceptionNode e;

        //上锁        final

ReentrantLock lock = exceptionTableLock;

        lock.lock();

        try

 {

            //删除过期的引用

            expungeStaleExceptions();

            ExceptionNode[] t = exceptionTable;

e = t[h & (t.length –1

)];

            while (e != null && e.get() != this

)

                e = e.next;

        } finally

 {

            lock.unlock();

        }

        Throwable ex;

        if (e == null || (ex = e.ex) == null

)

            return null

;

        if

 (e.thrower != Thread.currentThread().getId()) {

            Class ec = ex.getClass();

            try

 {

                Constructor<?> noArgCtor = null

;

                Constructor<?>[] cs = ec.getConstructors();// public ctors only                for (int i = 0

; i < cs.length; ++i) {

                    Constructor c = cs[i];

                    Class [] ps = c.getParameterTypes();

                    if(ps.length ==0

)

                        noArgCtor = c;

                    else if (ps.length == 1 && ps[0

] == Throwable.class) {

Throwable wx = (Throwable)c.newInstance(ex);

                        return (wx == null

) ? ex : wx;

                    }

                }

                if (noArgCtor != null

) {

Throwable wx = (Throwable)(noArgCtor.newInstance());

                    if (wx != null

) {

                        wx.initCause(ex);

                        return

 wx;

                    }

                }

            } catch

 (Exception ignore) {

            }

        }

        return

 ex;

    }

    

/**

     * 删除过期引用的实现形式

     */
    private static void expungeStaleExceptions() 

{

        //循环式筛选        for(Object x; (x = exceptionTableRefQueue.poll()) !=null

😉 {

            if (x instanceof

 ExceptionNode) {

                int

hashCode = ((ExceptionNode)x).hashCode;

                ExceptionNode[] t = exceptionTable;

                int i = hashCode & (t.length – 1

);

                ExceptionNode e = t[i];

ExceptionNode pred =null

;

                while (e != null

) {

                    ExceptionNode next = e.next;

                    if

 (e == x) {

                        if (pred == null

)

t[i] = next;

                        else

                            pred.next = next;

                        break

;

                    }

                    pred = e;

                    e = next;

                }

            }

        }

    }

    

/**

     * 轮询过时的引用并将其删除(带锁)

     */
    static final void helpExpungeStaleExceptions() 

{

        final

 ReentrantLock lock = exceptionTableLock;

        if

 (lock.tryLock()) {

            try

 {

                //调用上面的形式

expungeStaleExceptions();

            } finally

 {

                lock.unlock();

            }

        }

    }

    

/**

     * 重新抛出异常的形式

     */
    static void rethrow(Throwable ex) 

{

        if(ex !=null

)

            ForkJoinTask.uncheckedThrow(ex);

    }

    

/**

     * The sneaky part of sneaky throw, relying on generics

* limitations to evade compiler complaints about rethrowing

     * unchecked exceptions

     */
    @SuppressWarnings(“unchecked”) static        void uncheckedThrow(Throwable t) throws T

{

        throw (T)t; // rely on vacuous cast

    }

    

/**

* 根据s的不同抛出不同的异常

     */
    private void reportException(int s) 

{

        if

 (s == CANCELLED)

            throw new

 CancellationException();

        if

(s == EXCEPTIONAL)

            rethrow(getThrowableException());

    }

    // public methods    

/**

     * 将该各项任务加入到当前工作缓存队列中,等待被继续执行。

     */
    public final ForkJoinTask<V> fork() 

{

        Thread t;

                if((t = Thread.currentThread())instanceof

 ForkJoinWorkerThread)

            ((ForkJoinWorkerThread)t).workQueue.push(this

);

        else            //不是则加入工作池队列            ForkJoinPool.common.externalPush(this

);

        return this

;

    }

    

/**

     *回到排序的结论

     注意:此形式异常会导致中断

     */
    public final V join() 

{

        int

 s;

        if

 ((s = doJoin() & DONE_MASK) != NORMAL)

            reportException(s);

        //回到排序结论        return

 getRawResult();

    }

    

/**

     * 在当前缓存中继续执行该各项任务,并回到该各项任务的结论。

     *

     * 回到排序结论

     */
    public final V invoke() 

{

        int

 s;

        if

((s = doInvoke() & DONE_MASK) != NORMAL)

            reportException(s);

        return

 getRawResult();

    }

    

/**

     * 继续执行给定的两个各项任务,并等待这两个各项任务都完成。

     */
    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) 

{

        int

 s1, s2;

        t2.fork();

        if

((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)

            t1.reportException(s1);

        if

 ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)

            t2.reportException(s2);

    }

    

/**

     * 继续执行指定的一组各项任务,并等待所有各项任务都完成。

     */
    public static void invokeAll(ForkJoinTask<?>… tasks) 

{

        Throwable ex = null

;

        最后两个节点下标        int last = tasks.length – 1

;

        //倒序        for (int i = last; i >= 0

; –i) {

            

            ForkJoinTask t = tasks[i];

            //为空 展开抛出空指针异常            if (t == null

) {

                if (ex == null

)

                    ex = new

 NullPointerException();

            }

            //不为空 展开加入队列            else if (i != 0

)

t.fork();

            //继续执行并回到结论 如果状态为NORMAL 且 ex为空抛出异常            else if (t.doInvoke() < NORMAL && ex == null

)

ex = t.getException();

        }

        //自增展开        for (int i = 1

; i <= last; ++i) {

            ForkJoinTask t = tasks[i];

            if (t != null

) {

                if (ex != null

)

                    t.cancel(false

);

                //加入继续执行队列                else if

 (t.doJoin() < NORMAL)

                    ex = t.getException();

            }

        }

        //异常不为空则抛出异常        if (ex != null

)

rethrow(ex);

    }

    //同上类似    public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) 

{

        if(!(tasksinstanceof RandomAccess) || !(tasks instanceof

 List )) {

            invokeAll(tasks.toArray(new

 ForkJoinTask [tasks.size()]));

            return

 tasks;

        }

        @SuppressWarnings(“unchecked”

)

        List > ts =

            (List >) tasks;

        Throwable ex = null

;

        intlast = ts.size() –1

;

        for (int i = last; i >= 0

; –i) {

            ForkJoinTask t = ts.get(i);

            if (t == null

) {

                if (ex == null

)

                    ex = new

 NullPointerException();

            }

            else if (i != 0

)

                t.fork();

            else if (t.doInvoke() < NORMAL && ex == null

)

ex = t.getException();

        }

        for (int i = 1

; i <= last; ++i) {

            ForkJoinTask t = ts.get(i);

            if (t != null

) {

                if (ex != null

)

                    t.cancel(false

);

                else if

 (t.doJoin() < NORMAL)

                    ex = t.getException();

            }

        }

        if (ex != null

)

            rethrow(ex);

        return

 tasks;

    }

    //尝试取消此各项任务的继续执行。    public boolean cancel(boolean mayInterruptIfRunning) 

{

        return

(setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;

    }

    //判断是否结束 true是 false否    public final boolean isDone() 

{

        return status < 0

;

    }

    // 判断该各项任务是否已取消。    public final boolean isCancelled() 

{

        return

 (status & DONE_MASK) == CANCELLED;

    }

    

/**

* 判断该各项任务是否发生异常。true是 false否

     */
    public final boolean isCompletedAbnormally() 

{

        return

 status < NORMAL;

    }

    

/**

* 判断该各项任务是否正常完成。true是 false否

     */
    public final boolean isCompletedNormally() 

{

        return

 (status & DONE_MASK) == NORMAL;

    }

    

/**

public final Throwable getException() {

        int s = status & DONE_MASK;

        return ((s >= NORMAL) ? null :

(s == CANCELLED) ? new CancellationException() :

                getThrowableException());

    }

    /**

     * 异常完成该各项任务,并将发生的异常传递给等待该各项任务的缓存。

*/
    public void completeExceptionally(Throwable ex) 

{

        setExceptionalCompletion((ex instanceof

RuntimeException) ||

                                 (ex instanceof

 Error) ? ex :

                                 new

 RuntimeException(ex));

    }

    

/**

     *用作继续执行具体的各项任务逻辑。

     */
    public void complete(V value) 

{

        try

 {

            setRawResult(value);

        } catch

 (Throwable rex) {

            setExceptionalCompletion(rex);

            return

;

        }

setCompletion(NORMAL);

    }

    

/**

     * 在不设置值的情况下正常完成此各项任务

     *

     * @since

 1.8

     */
    public final void quietlyComplete() 

{

        setCompletion(NORMAL);

}

    

/**等待排序完成,接着检索其结论。

     */
    public final V get() throws InterruptedException, ExecutionException

{

        ints = (Thread.currentThread()instanceof

 ForkJoinWorkerThread) ?

            doJoin() : externalInterruptibleAwaitDone();

Throwable ex;

        if

 ((s &= DONE_MASK) == CANCELLED)

            throw new

 CancellationException();

        if(s == EXCEPTIONAL && (ex = getThrowableException()) !=null

)

            throw new

 ExecutionException(ex);

        return

 getRawResult();

    }

    

/**

* 带超时的等待继续执行结束,获取结论

     */
    public final V get(long timeout, TimeUnit unit)        throwsInterruptedException, ExecutionException, TimeoutException

{

        int

 s;

        long

 nanos = unit.toNanos(timeout);

        if

 (Thread.interrupted())

            throw new

InterruptedException();

        if ((s = status) >= 0 && nanos > 0L

) {

            long

 d = System.nanoTime() + nanos;

            long deadline = (d == 0L) ?1L : d; // avoid 0

            Thread t = Thread.currentThread();

            if (t instanceof

 ForkJoinWorkerThread) {

ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;

                s = wt.pool.awaitJoin(wt.workQueue, this

, deadline);

            }

            else if((s = ((this instanceof

 CountedCompleter) ?

                           ForkJoinPool.common.externalHelpComplete(

                               (CountedCompleter<?>)this, 0

) :

                           ForkJoinPool.common.tryExternalUnpush(this

) ?

                           doExec() : 0)) >= 0

) {

                long ns, ms; // measure in nanosecs, but wait in millisecs                while ((s = status) >= 0

 &&

                       (ns = deadline – System.nanoTime()) > 0L

) {

                    if((ms = TimeUnit.NANOSECONDS.toMillis(ns)) >0L

 &&

                        U.compareAndSwapInt(this

, STATUS, s, s | SIGNAL)) {

                        synchronized (this

) {

                            if (status >= 0

)

                                wait(ms); // OK to throw InterruptedException                            else

                                notifyAll();

                        }

                    }

                }

            }

        }

        if(s >=0

)

            s = status;

        if

 ((s &= DONE_MASK) != NORMAL) {

            Throwable ex;

            if

 (s == CANCELLED)

                throw new

CancellationException();

            if

 (s != EXCEPTIONAL)

                throw new

 TimeoutException();

            if ((ex = getThrowableException()) != null

)

                throw new

 ExecutionException(ex);

        }

        return

 getRawResult();

    }

    

/**

     * 加入此各项任务,而不回到其结论或引发其异常。

     */
    public final void quietlyJoin() 

{

        doJoin();

    }

    

/**

     *开始继续执行此各项任务,并在必要时等待其完成,而不回到其结论或引发其异常。

     */
    public final void quietlyInvoke() 

{

        doInvoke();

    }

    

/**

     * 用作等待所有正在继续执行的各项任务完成。

     */
    public static void helpQuiesce() 

{

        Thread t;

        if ((t = Thread.currentThread()) instanceof

 ForkJoinWorkerThread) {

            ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;

wt.pool.helpQuiescePool(wt.workQueue);

        }

        else

            ForkJoinPool.quiesceCommonPool();

    }

    

/**

     * 重置此各项任务的内部簿记状态,允许后续

     */
    public void reinitialize() 

{

        if

 ((status & DONE_MASK) == EXCEPTIONAL)

            clearExceptionalCompletion();

        else            status = 0

;

    }

    

/**

     */
    public static ForkJoinPool getPool() 

{

        Thread t = Thread.currentThread();

        return (t instanceof

ForkJoinWorkerThread) ?

            ((ForkJoinWorkerThread) t).pool : null

;

    }

    

/**

     *判断当前是否为缓存池继续执行,如果是则回到true

     */
    public static boolean inForkJoinPool() 

{

        return Thread.currentThread() instanceof

 ForkJoinWorkerThread;

    }

    

/**

     * 尝试取消继续执行的各项任务

     */
    public boolean tryUnfork() 

{

        Thread t;

                return (((t = Thread.currentThread()) instanceof

ForkJoinWorkerThread) ?

                ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this

) :

ForkJoinPool.common.tryExternalUnpush(this

));

    }

    

/**

     * 回到当前各项任务的队列计数(排队数)

     */
    public static int getQueuedTaskCount() 

{

Thread t; ForkJoinPool.WorkQueue q;

        if ((t = Thread.currentThread()) instanceof

 ForkJoinWorkerThread)

q = ((ForkJoinWorkerThread)t).workQueue;

        else

            q = ForkJoinPool.commonSubmitterQueue();

        return (q == null) ? 0

 : q.queueSize();

    }

    

/**

的排队数

     *

     * @return

 the surplus number of tasks, which may be negative

     */
    public static int getSurplusQueuedTaskCount() 

{

        return

 ForkJoinPool.getSurplusQueuedTaskCount();

    }

    // Extension methods    

/**

* 回到的结论(即使此各项任务异常完成)或 {@code

 null}(如果此各项任务未知已完成)

     *

     * @return the result, or {@code

 null} if not completed

     */
    public abstract V getRawResult()

;

    

/**

     * Forces the given value to be returned as a result. This method

* is designed to support extensions, and should not in general be

     * called otherwise.

     *

     * @param

 value the value

     */
    protected abstract void setRawResult(V value)

;

    

/**

     * 抽象的继续执行形式 (子类须要实现)

     */
    protected abstract boolean exec()

;

    

/**

     */
    protected static

 ForkJoinTask peekNextLocalTask() {

        Thread t; ForkJoinPool.WorkQueue q;

        if((t = Thread.currentThread())instanceof

 ForkJoinWorkerThread)

            q = ((ForkJoinWorkerThread)t).workQueue;

        else

q = ForkJoinPool.commonSubmitterQueue();

        return (q == null) ? null

 : q.peek();

    }

    

/**

     */
    protected static

 ForkJoinTask pollNextLocalTask() {

        Thread t;

        return ((t = Thread.currentThread()) instanceof

ForkJoinWorkerThread) ?

            ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :

            null

;

    }

    

/**

* 用作从当前缓存绑定的工作队列和共享队

     */
    protected static

 ForkJoinTask pollTask() {

        Thread t; ForkJoinWorkerThread wt;

        return((t = Thread.currentThread())instanceof

 ForkJoinWorkerThread) ?

(wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :

            null

;

    }

    // tag operations    

/**

     * @since

 1.8

     */
    public final short getForkJoinTaskTag() 

{

        return (short

)status;

    }

    

/**

     * 以原子形式设置此各项任务的标记值。

     */
    public final short setForkJoinTaskTag(short tag) 

{

        for (int

s;;) {

            if (U.compareAndSwapInt(this

, STATUS, s = status,

                                    (s & ~SMASK) | (tag & SMASK)))

                return (short

)s;

        }

    }

    

/**

     *以原子形式有条件地设置此各项任务的标记值。

     */
    public final boolean compareAndSetForkJoinTaskTag(short e, short tag) 

{

        for (int

s;;) {

            if ((short

)(s = status) != e)

                return false

;

            if (U.compareAndSwapInt(this

, STATUS, s,

(s & ~SMASK) | (tag & SMASK)))

                return true

;

        }

    }

    

/**

     * 适配器类

     */
    static final class AdaptedRunnable<T> extends ForkJoinTask<T

>

        implements RunnableFuture<T>

{

        //用作存储被适配的 Runnable 对象。        final

 Runnable runnable;

        T result;

        //构造形式 缓存和结论为入参

AdaptedRunnable(Runnable runnable, T result) {

            if (runnable == null) throw new

 NullPointerException();

            this

.runnable = runnable;

            this.result = result; // OK to set this even before completion

        }

                public final T getRawResult() { return

 result; }

        //设置继续执行结论形式        public final void setRawResult(T v) 

{ result = v; }

        //继续执行回到true        public final boolean exec() { runnable.run(); return true

; }

        //运行无回到        public final void run() 

{ invoke(); }

        private static final long serialVersionUID = 5232453952276885070L

;

    }

    

/**

     * 有回到的适配器类 与上面的类似,唯一的差别是没有回到

     */
    static final class AdaptedRunnableAction extends ForkJoinTask<Void

>

        implements RunnableFuture<Void>

{

        final

 Runnable runnable;

AdaptedRunnableAction(Runnable runnable) {

            if (runnable == null) throw new

 NullPointerException();

            this

.runnable = runnable;

        }

        public final Void getRawResult() { return null

; }

        public final void setRawResult(Void v) 

{ }

        public final boolean exec() { runnable.run(); return true

; }

        public final void run() 

{ invoke(); }

        private static final long serialVersionUID = 5232453952276885070L

;

    }

    

/**

     * 同上类似(带异常)

     */
    static final class RunnableExecuteAction extends ForkJoinTask<Void>

{

        final

 Runnable runnable;

        RunnableExecuteAction(Runnable runnable) {

            if(runnable ==null) throw new

 NullPointerException();

            this

.runnable = runnable;

        }

        public final Void getRawResult() { return null

; }

        public final void setRawResult(Void v) 

{ }

        public final boolean exec() { runnable.run(); return true

; }

        void internalPropagateException(Throwable ex) 

{

            rethrow(ex); // rethrow outside exec() catches.

        }

        private static final long serialVersionUID = 5232453952276885070L

;

    }

    

/**

     * 适配器

     */
    static final class AdaptedCallable<T> extends ForkJoinTask<T

>

        implements RunnableFuture<T>

{

        final

 Callable callable;

        T result;

AdaptedCallable(Callable callable) {

            if (callable == null) throw new

 NullPointerException();

            this

.callable = callable;

        }

        public finalTgetRawResult() { return

 result; }

        public final void setRawResult(T v) 

{ result = v; }

        public final boolean exec() 

{

            try

 {

                result = callable.call();

                return true

;

            } catch

 (Error err) {

                throw

 err;

            } catch

(RuntimeException rex) {

                throw

 rex;

            } catch

 (Exception ex) {

                throw new

 RuntimeException(ex);

            }

        }

        public final void run() 

{ invoke(); }

        private static final long serialVersionUID = 2838392045355241008L

;

    }

    

/**

     * 回到两个新的AdaptedRunnableAction适配器

     */
    public static

 ForkJoinTask adapt(Runnable runnable) {

        return new

 AdaptedRunnableAction(runnable);

    }

    

/**

* 回到两个AdaptedRunnable适配器

     */
    public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) 

{

        return new

AdaptedRunnable(runnable, result);

    }

    

/**

     * 回到两个AdaptedCallable适配器

     */
    public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) 

{

        return new

 AdaptedCallable(callable);

    }

    // Serialization support    private static final long serialVersionUID = –7721805057305804111L

;

    

/**

     * 输入对象流

     */
    private void writeObject(java.io.ObjectOutputStream s)        throws java.io.IOException

{

        s.defaultWriteObject();

        s.writeObject(getException());

    }

    

/**

     * 读文件

     */
    private void readObject(java.io.ObjectInputStream s)        throws java.io.IOException, ClassNotFoundException

{

s.defaultReadObject();

        Object ex = s.readObject();

        if (ex != null

)

setExceptionalCompletion((Throwable)ex);

    }

    // Unsafe mechanics    private static final

 sun.misc.Unsafe U;

    private static final long

 STATUS;

    //初始化静态标识符块    static

 {

        //实例会异常锁        exceptionTableLock = new

 ReentrantLock();

        //实例会引用对象        exceptionTableRefQueue = new

 ReferenceQueue();

        //实例化异常节点对象        exceptionTable = new

 ExceptionNode[EXCEPTION_MAP_CAPACITY];

        try

 {

            //实例化unsafe

U = sun.misc.Unsafe.getUnsafe();

            Class k = ForkJoinTask.class;

            STATUS = U.objectFieldOffset

                (k.getDeclaredField(“status”

));

        } catch

 (Exception e) {

            throw new

 Error(e);

        }

    }

}

最后

非常可惜到目前为止,我在很多项目中及跟很多做开发的同事或朋友,这个ForkJoin用到的很少,绝大部分还是停留 在CURD,好可惜,但是其实数据库查询强化、图像处置、机器自学、大数据处置等那些情景都能通过ForkJoin的博戈达计算能力来实现那些情景。当然这个ForkJoin还有非常非常多的用法,能另行了解。

jdk-Futeure&ForkJoin框架源码学习

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务