AQS源码详细解读

2023-01-16 0 969

结语

AQS是用以构筑锁和其它并行模块的此基础架构,它也是Java五大mammalian辅助工具类(CountDownLatch、CyclicBarrier、Semaphore)的此基础。ReentrantLock,即使BlockingQueue也是如前所述它的同时实现,能说是十分关键了。

单纯如是说呵呵,AQS只不过是两个类,全名是AbstractQueuedSynchronizer,堆栈Amou。责任编辑的重点项目是科学研究它的源代码,其它的此基础就不多说啦。想介绍AQS此基础的老师能看呵呵3y元老的该文

先期有良机就要撷取他们对ReentrantLock,LinkedBlockingQueue,缓存池源代码的认知~

该文编者按:AQS关键核心成员变

一、AQS的关键核心成员表达式

AQS中主要就保护了state(锁状况的则表示)和两个可堵塞的等候堆栈。

private volatile int state;

有关state的get,set方式就不贴了,关键的是有个透过CAS修正state的方式。

//增设平均数,想修正的值。透过CAS操作方式同时实现。 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

除此之外,还保护了等候堆栈(也叫CLH堆栈,并行堆栈)的头节点和尾节点。

private transient volatile Node head; private transient volatile Node tail;

二、内部类-Node

AQS的工作模式分为独占模式和共享模ntLock就同时实现了独占模式;但也有例外,ReentrantReadAndWriteLock同时实现了独占模式和共享模式。下面来看Node相关源代码。

//当前节点处于共享模式的标记 static final Node SHARED = new Node(); //当前节点处于独占模式的标记 static final Node EXCLUSIVE = null; //缓存被取消了 static final int CANCELLED = 1; //释放资源后需唤醒后继节点 static final int SIGNAL = 1; //等候condition唤醒 static final int CONDITION = 2; //工作于共享锁状况,需要向后传播, //比如根据资源是否剩余,唤醒后继节点 static final int PROPAGATE = 3; //等候状况,有1,0,-1,-2,-3五个值。分别对应上面的值 volatile int waitStatus; //前驱节点 volatile Node prev; //后继节点 volatile Node next; //等候锁的缓存 volatile Thread thread; //等候条件的下两个节点,ConditonObject中用到 Node nextWaiter;

对于等候状况(waitStatus)做两个解释。

CANCELLED =1 缓存被取消了SIGNAL =-1 释放资源后需唤醒后继节点CONDITION = -2 等候condition唤醒PROPAGATE = -3 (共享锁)状况需要向后传播0 初始状况,正常状况

CANCELLED

作废状况,该节点的缓存由于超时,中断等原因而处于作废状况。是不可逆的,一旦处于这个状态,说明应该将该节点移除了。

SIGNAL

待唤醒后继状况,当前节点的缓存处于此状况,后继节点会被挂起,当前节点释放锁或取消之后必须唤醒它的后继节点。

CONDITION

等候状况,表明节点对应的缓存因为不满足两个条件(Condition)而被堵塞。

red()。先来看acquire(),该方式只工作于独占模式。 用了这个方式)
public final void acquire(int arg) { if (!tryAcquire(arg) && //让缓存处于一种自旋状况, 从自旋过程中 //退出,否则继续。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

之前也提到了AQS使用了模板方式模式,只不过tryAcuire()方式是两个钩子方式。在AQS中,此方式会抛出UnsupportedOperationException,所以需要子类去同时实现。tryAcquire(arg)返回false,只不过是

addWaiter():将当前缓存插入至队尾,返回在等候堆栈中的节点(是处理了它的前驱后继)。

private Node addWaiter(Node mode) { //把当前缓存封装为node,指定资源访问模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //如果tail不为空,把node插入末尾 if (pred != null) { node.prev = pred; //此时可能有其它缓存插入,所以使用CAS重新判断tail if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果tail为空,说明堆栈还没有初始化,执行enq() enq(node); return node; }

enq():将节点插入队尾,失败则自旋,直到成功。

private Node enq(final Node node) { for (;;) { Node t = tail; //虽然tail==null才会执行本方式 //但是可能刚好有其它缓存插入,会导致 //之前的判断失效,所以重新判断tail是否为空 //队尾为空,说明堆栈中没有节点 //初始化头尾节点 if (t == null) { if (compareAndSetHead(new Node())) //初始化完成后,接着走下两个循环, //直到node正常插入尾部 tail = head; } else { //下面是链表的正常插入操作方式了 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

是否需要被挂起。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //如果节点的前驱是堆栈的头节点并且能拿到资源 //成功后则返回中断位结束 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //shouldParkAfterFailedAcquire(Node, Node)检测当前节点是否应该park() //parkAndCheckInterrupt()用于中断当前节点中的缓存 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

shouldParkAfterFailedAcquire():判断当前节点是否应该被挂起。下面涉及到的等候状况,这里再回忆呵呵,CANCELLED =1,SIGNAL =-1,CONDITION = -2,PROPAGATE = -3,0

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //前驱节点的状况是SIGNAL,说明前驱节点释放资源后会通知他们 //此时当前节点能安全的park(),因此返回true return true; if (ws > 0) { //此时一直往前找,直到找到最近的两个处于正常等候状况的节点 //并排在它后面,返回false do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前驱节点的状况是0或PROPGATE,则利用CAS将前置节点的状况置 //为SIGNAL,让它释放资源后通知他们 //如果前置节点刚释放资源,状况就不是SIGNAL了,这时就会失败 // 返回false compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

parkAndCheckInterrupt():若确定有必要park,才会执行此方式。

private final boolean parkAndCheckInterrupt() { //使用LockSupport,挂起当前缓存 LockSupport.park(this); return Thread.interrupted(); }

selfInterrupt():对当前缓存产生两个中断请求。能走到这个方式,说明acquireQueued()返回true,就进行自我中断。

static void selfInterrupt() { Thread.currentThread().interrupt(); }

aquire的步骤:

de.EXCLUSIVE), arg)方式把当前缓存添加到等候堆栈队尾,并标记为独占模式。

ue,则将当前缓存中断;false则说明拿到资源了。

5)在进行是否需要挂起的判断中,如果前置节点是SIGNAL状况,就挂起,返回true。如果前置节点状况为CANCELLED,就一直往前找,直到找到最近的两个处于正常等候状况的节点,并排在它后面,返回false,acquireQueed()接着自旋尝试,回到3)。

6)前置节点处于其它状况,利用CAS将前置节点状况置为SIGNAL。当前置节点刚释放资源,状况就不是SIGNAL了,导致失败,返回false。但凡返回false,就导致acquireQueed()接着自旋尝试。

7)最终当tryAcquire(int)返回false,acquireQueued(Node,int)返回true,调用selfInterrupt(),中断当前缓存。

acquir
public final void acquireShared(int arg) { //模板方式模式,tryAcquireShared由子类同时实现 //想看的话推荐读写锁的源代码,这里就不细述了 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }

doAcquireShared():同时实现上和acquire()方式差不多,是多判断了是否还有剩余资源,唤醒后继节点。

private void doAcquireShared(int arg) { //将缓存加入等候堆栈,增设为共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //是否需要被挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

3.3 hasQueuedPredecessors()–公平锁在tryAqcuire()时调用

这里补充两个方式,ReentrantLock如果是公平锁的话,会调用AQS中的这个方式,算是先期该文的铺垫吧。

boolean hasQueuedPredecessors():判断当前缓存是否位于CLH并行堆栈中的第两个。如果是则返回flase,否则返回true。
public final boolean hasQueuedPredecessors() { //判断当前节点在等候堆栈中是否为头节点的后继节点(头节点不存储数据), //如果不是,则说明有缓存比当前缓存更早的请求资源, //根据公平性,当前缓存请求资源失败。 //如果当前节点没有前驱节点的话,才有做后面的逻辑判断的必要性 Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

这个方式在ReentrantLock.tryLock()过程中被调用。

doAcquireNanos():这个方式只工作于独占模
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; //计算截至时间 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return true; } //重新计算超时时间 nanosTimeout = deadline System.nanoTime(); //超时则返回false if (nanosTimeout <= 0L) return false; //否则判断是否需要被堵塞,堵塞规定时间 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

这个方式在ReentrantLock.lockInterruptibly()过程中被调用。

private void doAcquireInterruptibly(int arg) throws InterruptedException { //添加到等候堆栈,包装成Node final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } //自旋,直到前驱节点等候状况为SIGNAL,检查中断标志 //符合条件则堵塞当前缓存 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //当前缓存被堵塞后,会中断 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

四、释放资源(锁)

同样的,释放资源也分为释放独占锁资源(release())和共享锁(releaseShared())资源。

先来看对于独占锁的释放。

4.1 release()–独占模式释放资源

release():工作于独占模式,首先调用子类的tryRelease()方式释放锁,然后唤醒后继节点,在唤醒的过程中,需要判断后继节点是否满足情况,如果后继节点不为空且不是作废状况,则唤醒这个后继节点,否则从tail节点向前寻找合适的节点,如果找到,则唤醒。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

unparkSuccesso

private void unparkSuccessor(Node node) { //如果状况为负说明是除CANCEL以外的状况, //尝试在等候信号时清除。 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //下两个节点为空或CANCELLED状况 //则从队尾往前找,找到正常状况的节点作为之后的继承人 //也是下两个能拿到资源的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //此时找到继承人了,那么就唤醒它 if (s != null) LockSupport.unpark(s.thread); }

4.2 releaseShared()–共享模式释放资源

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

doReleaseShared():把当前结点的状况由SIGNAL增设为PROPAGATE状况。

private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //将自旋尝试修正等候状况为0 continue; //唤醒下两个被堵塞的节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //增设为PROPAGATE状况 continue; } //上面代码块执行过程中如果head变更了,就接着循环尝试 if (h == head) break; } }

五、内部类-ConditionObject

ConditionObject同时实现了Condition接口。用于缓存间的通信,能够把锁粒度减小。重点项目是await()和signal()。这个内部类还保护了两个condition堆栈,而且Node.nextWaiter是用以将condition连接起来的。

//condition队头 private transient Node firstWaiter; //condition队尾 private transient Node lastWaiter; //发生了中断,但在先期不抛出中断异常,而是“补上”这次中断 private static final int REINTERRUPT = 1; //发生了中断,且在先期需要抛出中断异常 private static final int THROW_IE = 1;

5.1 await()–堵塞等候方式

5.1.1 await的流程

await():当前缓存处于堵塞状况,直到调用signal()或中断才能被唤醒。

1)将当前缓存封装成node且等候状况为CONDITION。

3)加入到条件堆栈后,则堵塞当前缓存,等候被唤醒。

4)如果是因signal被唤醒,则节点会从条件堆栈转移到等候堆栈;如果是因中断被唤醒,则记录中断状况。两种情况都会跳出循环。

public final void await() throws InterruptedException { //如果被中断,就处理中断异常 if (Thread.interrupted()) throw new InterruptedException(); //初始化链表的功能,增设当前缓存为链尾 Node node = addConditionWaiter(); //释放当前节点持有的所有资源 int savedState = fullyRelease(node); int interruptMode = 0; //如果当前缓存不在等候堆栈中, //说明此时一定在条件堆栈里,将其堵塞。 while (!isOnSyncQueue(node)) { LockSupport.park(this); //说明中断状况发生变化 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //当前缓存执行了signal方式会经过这个,也是重新将当前缓存加入并行堆栈中 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //删除条件堆栈中不满足要求的元素 if (node.nextWaiter != null) unlinkCancelledWaiters(); //处理被中断的情况 if (interruptMode != 0) //这里是个难点,具体的同时实现我他们也有点不认知 //就把知道的都写出来 //如果是THROW_IE,说明signal之前发生中断 //如果是REINTERRUPT,signal之后中断, reportInterruptAfterWait(interruptMode); }

addConditionWaiter():将当前缓存封装成节点,添加到条件堆栈尾部,并返回当前节点。

private Node addConditionWaiter() { Node t = lastWaiter; // 判断队尾元素,如果非条件等候状况则清理出去 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); //可能t之前引用的节点被删除了,所以要重新引用 t = lastWaiter; } //这个节点就则表示当前缓存 Node node = new Node(Thread.currentThread(), Node.CONDITION); //说明条件按堆栈中没有元素 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

unlinkCancelledWaiters():遍历一遍条件堆栈,删除非CONDITION状况的节点。

private void unlinkCancelledWaiters() { Node t = firstWaiter; //记录在循环中上两个waitStatus有效的节点 Node trail = null; while (t != null) { Node next = t.nextWaiter; //再次判断等候状况,保证节点都是CONDITION状况 //确保当前节点无效后删除引用 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else //否则就直接加到队尾的后面 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else //记录有效的节点并向后遍历 trail = t; t = next; } }

5.1.2 await中有关中断的处理

透过对上面代码的观察,我们知道await()调用了checkInterruptWhileWaiting()。

有关中断这一块,我他们看的也比较迷,就把一些他们能认知的地方标注呵呵。

checkInterruptWhileWaiting():判断在堵塞过程中是否被中断。如果返回THROW_IE,则则表示缓存在调用signal()之前中断的;如果返回REINTERRUPT,则表明缓存在调用signal()之后中断;如果返回0则则表示没有被中断。
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }

transferAfterCancelledWait():缓存是否因为中断从park中唤醒。

final boolean transferAfterCancelledWait(Node node) { //如果修正成功,暂且认为中断发生后,signal()被调用 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); //true则表示中断先于signal发生 return true; } //~~不认知~~ while (!isOnSyncQueue(node)) Thread.yield(); return false; }

5.2.1 signal()–唤醒condition堆栈中的缓存

signal():唤醒两个被堵塞的缓存。

public final void signal() { //判断当前缓存是否为资源的持有者 //这也是必须在lock()与unlock()代码中间执行的原因 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //开始唤醒条件堆栈的第两个节点 Node first = firstWaiter; if (first != null) doSignal(first); }

doSignal():将条件堆栈的头节点从条件堆栈转移到等候堆栈,并且,将该节点从条件堆栈删除。

private void doSignal(Node first) { do { //先期的等候条件为空,说明condition堆栈中只有一个节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //transferForSignal()是真正唤醒头节点的地方 } while (!transferForSignal(first) && (first = firstWaiter) != null); }

transferForSignal():将节点放入等候堆栈并唤醒。并不需要在条件堆栈中移除,因为条件堆栈每次插入时都会把状况不为CONDITION的节点清理出去。

final boolean transferForSignal(Node node) { //当前节点等候状况改变失败,则说明当前节点不是CONDITION //状况,那就不能进行接下来的操作方式,返回false //0是正常状况 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //放入等候堆栈队尾中,并返回之前堆栈的前两个节点 Node p = enq(node); int ws = p.waitStatus; //如果节点没有被取消,或更改状况失败,则唤醒被堵塞的缓存 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

5.2.2 signalAll()–唤醒condition堆栈中所有缓存

signalAll()本质上还是调用了doSignalAll()

doSignalAll():遍历条件堆栈,插入到等候堆栈中。
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }

5.3 awaitNanos()–超时机制

这里补充两个方式awaitNanos(),是我看堵塞堆栈源代码中遇到的。

awaitNanos():轮询检查缓存是否在并行缓存上,如果在则退出自旋。否则检查是否已超过解除挂起时间,如果超过,则退出挂起,否则继续挂起缓存到等候解除挂起。退出挂起之后,采用自旋的方式竞争锁。
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; //采用自旋的方式检查是否已在等候堆栈当中 while (!isOnSyncQueue(node)) { //如果挂起超过一定的时间,则退出 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } //继续挂起缓存 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline System.nanoTime(); } //采用自旋的方式竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline System.nanoTime(); }

总结

里面保护了两个堆栈,两个是等候堆栈(CLH),还有两个是条件堆栈(condition)。

release()释放资源,需要唤醒后继节点,判断后继节点是否满足情况。如果后继节点不为空且不是作废状况,则唤醒这个后继节点;否则从尾部往前面找适合的节点,找到则唤醒。

调用await(),缓存会进入条件堆栈,等候被唤醒

如有不当之处,欢迎指出~

如果喜欢我的该文,欢迎关注我的专栏~

参考该文:Javamammalian编程札记AQS简单纯单过一遍,Condition源代码分析JUC.Condition学习笔记[附详尽源代码解析]

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务