结语
AQS是用以构筑锁和其它并行模块的此基础架构,它也是Java五大mammalian辅助工具类(CountDownLatch、CyclicBarrier、Semaphore)的此基础。ReentrantLock,即使BlockingQueue也是如前所述它的同时实现,能说是十分关键了。
单纯如是说呵呵,AQS只不过是两个类,全名是AbstractQueuedSynchronizer,堆栈Amou。责任编辑的重点项目是科学研究它的源代码,其它的此基础就不多说啦。想介绍AQS此基础的老师能看呵呵3y元老的该文
。
先期有良机就要撷取他们对ReentrantLock,LinkedBlockingQueue,缓存池源代码的认知~
该文编者按:AQS关键核心成员变一、AQS的关键核心成员表达式
AQS中主要就保护了state(锁状况的则表示)和两个可堵塞的等候堆栈。
private volatile int state;
有关state的get,set方式就不贴了,关键的是有个透过CAS修正state的方式。
//增设平均数,想修正的值。透过CAS操作方式同时实现。
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
除此之外,还保护了等候堆栈(也叫CLH堆栈,并行堆栈)的头节点和尾节点。
private transient volatile Node head;
private transient volatile Node tail;
二、内部类-Node
AQS的工作模式分为独占模式和共享模ntLock就同时实现了独占模式;但也有例外,ReentrantReadAndWriteLock同时实现了独占模式和共享模式。下面来看Node相关源代码。
//当前节点处于共享模式的标记
static final Node SHARED = new Node();
//当前节点处于独占模式的标记
static final Node EXCLUSIVE = null;
//缓存被取消了
static final int CANCELLED = 1;
//释放资源后需唤醒后继节点 static final int SIGNAL = –1;
//等候condition唤醒
static final int CONDITION = –2;
//工作于共享锁状况,需要向后传播,
//比如根据资源是否剩余,唤醒后继节点 static final int PROPAGATE = –3;
//等候状况,有1,0,-1,-2,-3五个值。分别对应上面的值
volatile int waitStatus;
//前驱节点 volatile Node prev;
//后继节点
volatile Node next;
//等候锁的缓存
volatile Thread thread;
//等候条件的下两个节点,ConditonObject中用到 Node nextWaiter;
对于等候状况(waitStatus)做两个解释。
CANCELLED =1 缓存被取消了SIGNAL =-1 释放资源后需唤醒后继节点CONDITION = -2 等候condition唤醒PROPAGATE = -3 (共享锁)状况需要向后传播0 初始状况,正常状况CANCELLED
作废状况,该节点的缓存由于超时,中断等原因而处于作废状况。是不可逆的,一旦处于这个状态,说明应该将该节点移除了。
SIGNAL
待唤醒后继状况,当前节点的缓存处于此状况,后继节点会被挂起,当前节点释放锁或取消之后必须唤醒它的后继节点。
CONDITION
等候状况,表明节点对应的缓存因为不满足两个条件(Condition)而被堵塞。
red()。先来看acquire(),该方式只工作于独占模式。
用了这个方式)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//让缓存处于一种自旋状况,
从自旋过程中 //退出,否则继续。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
之前也提到了AQS使用了模板方式模式,只不过tryAcuire()方式是两个钩子方式。在AQS中,此方式会抛出UnsupportedOperationException,所以需要子类去同时实现。tryAcquire(arg)返回false,只不过是
addWaiter():将当前缓存插入至队尾,返回在等候堆栈中的节点(是处理了它的前驱后继)。
private Node addWaiter(Node mode) {
//把当前缓存封装为node,指定资源访问模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure Node pred = tail;
//如果tail不为空,把node插入末尾
if (pred != null) {
node.prev = pred;
//此时可能有其它缓存插入,所以使用CAS重新判断tail if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果tail为空,说明堆栈还没有初始化,执行enq() enq(node);
return node;
}
enq():将节点插入队尾,失败则自旋,直到成功。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//虽然tail==null才会执行本方式
//但是可能刚好有其它缓存插入,会导致
//之前的判断失效,所以重新判断tail是否为空
//队尾为空,说明堆栈中没有节点
//初始化头尾节点
if (t == null) {
if (compareAndSetHead(new Node()))
//初始化完成后,接着走下两个循环,
//直到node正常插入尾部
tail = head;
} else {
//下面是链表的正常插入操作方式了 node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
是否需要被挂起。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果节点的前驱是堆栈的头节点并且能拿到资源
//成功后则返回中断位结束
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire(Node, Node)检测当前节点是否应该park() //parkAndCheckInterrupt()用于中断当前节点中的缓存
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire():判断当前节点是否应该被挂起。下面涉及到的等候状况,这里再回忆呵呵,CANCELLED =1,SIGNAL =-1,CONDITION = -2,PROPAGATE = -3,0
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驱节点的状况是SIGNAL,说明前驱节点释放资源后会通知他们
//此时当前节点能安全的park(),因此返回true
return true;
if (ws > 0) {
//此时一直往前找,直到找到最近的两个处于正常等候状况的节点
//并排在它后面,返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱节点的状况是0或PROPGATE,则利用CAS将前置节点的状况置 //为SIGNAL,让它释放资源后通知他们
//如果前置节点刚释放资源,状况就不是SIGNAL了,这时就会失败
// 返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt():若确定有必要park,才会执行此方式。
private final boolean parkAndCheckInterrupt() {
//使用LockSupport,挂起当前缓存
LockSupport.park(this);
return Thread.interrupted();
}
selfInterrupt():对当前缓存产生两个中断请求。能走到这个方式,说明acquireQueued()返回true,就进行自我中断。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
aquire的步骤:
de.EXCLUSIVE), arg)方式把当前缓存添加到等候堆栈队尾,并标记为独占模式。
ue,则将当前缓存中断;false则说明拿到资源了。
5)在进行是否需要挂起的判断中,如果前置节点是SIGNAL状况,就挂起,返回true。如果前置节点状况为CANCELLED,就一直往前找,直到找到最近的两个处于正常等候状况的节点,并排在它后面,返回false,acquireQueed()接着自旋尝试,回到3)。
6)前置节点处于其它状况,利用CAS将前置节点状况置为SIGNAL。当前置节点刚释放资源,状况就不是SIGNAL了,导致失败,返回false。但凡返回false,就导致acquireQueed()接着自旋尝试。
7)最终当tryAcquire(int)返回false,acquireQueued(Node,int)返回true,调用selfInterrupt(),中断当前缓存。
acquir
public final void acquireShared(int arg) {
//模板方式模式,tryAcquireShared由子类同时实现
//想看的话推荐读写锁的源代码,这里就不细述了 if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared():同时实现上和acquire()方式差不多,是多判断了是否还有剩余资源,唤醒后继节点。
private void doAcquireShared(int arg) {
//将缓存加入等候堆栈,增设为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//是否需要被挂起 if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3 hasQueuedPredecessors()–公平锁在tryAqcuire()时调用
这里补充两个方式,ReentrantLock如果是公平锁的话,会调用AQS中的这个方式,算是先期该文的铺垫吧。
boolean hasQueuedPredecessors():判断当前缓存是否位于CLH并行堆栈中的第两个。如果是则返回flase,否则返回true。
public final boolean hasQueuedPredecessors() {
//判断当前节点在等候堆栈中是否为头节点的后继节点(头节点不存储数据),
//如果不是,则说明有缓存比当前缓存更早的请求资源, //根据公平性,当前缓存请求资源失败。
//如果当前节点没有前驱节点的话,才有做后面的逻辑判断的必要性
Node t = tail; // Read fields in reverse initialization order Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
这个方式在ReentrantLock.tryLock()过程中被调用。
doAcquireNanos():这个方式只工作于独占模
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//计算截至时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//重新计算超时时间 nanosTimeout = deadline – System.nanoTime();
//超时则返回false
if (nanosTimeout <= 0L)
return false;
//否则判断是否需要被堵塞,堵塞规定时间 if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方式在ReentrantLock.lockInterruptibly()过程中被调用。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//添加到等候堆栈,包装成Node
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC failed = false;
return;
}
//自旋,直到前驱节点等候状况为SIGNAL,检查中断标志
//符合条件则堵塞当前缓存
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//当前缓存被堵塞后,会中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
四、释放资源(锁)
同样的,释放资源也分为释放独占锁资源(release())和共享锁(releaseShared())资源。
先来看对于独占锁的释放。
4.1 release()–独占模式释放资源
release():工作于独占模式,首先调用子类的tryRelease()方式释放锁,然后唤醒后继节点,在唤醒的过程中,需要判断后继节点是否满足情况,如果后继节点不为空且不是作废状况,则唤醒这个后继节点,否则从tail节点向前寻找合适的节点,如果找到,则唤醒。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccesso
private void unparkSuccessor(Node node) {
//如果状况为负说明是除CANCEL以外的状况, //尝试在等候信号时清除。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//下两个节点为空或CANCELLED状况 //则从队尾往前找,找到正常状况的节点作为之后的继承人
//也是下两个能拿到资源的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//此时找到继承人了,那么就唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}
4.2 releaseShared()–共享模式释放资源
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared():把当前结点的状况由SIGNAL增设为PROPAGATE状况。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//将自旋尝试修正等候状况为0 continue;
//唤醒下两个被堵塞的节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//增设为PROPAGATE状况
continue;
}
//上面代码块执行过程中如果head变更了,就接着循环尝试
if (h == head)
break;
}
}
五、内部类-ConditionObject
ConditionObject同时实现了Condition接口。用于缓存间的通信,能够把锁粒度减小。重点项目是await()和signal()。这个内部类还保护了两个condition堆栈,而且Node.nextWaiter是用以将condition连接起来的。
//condition队头
private transient Node firstWaiter;
//condition队尾
private transient Node lastWaiter;
//发生了中断,但在先期不抛出中断异常,而是“补上”这次中断
private static final int REINTERRUPT = 1;
//发生了中断,且在先期需要抛出中断异常 private static final int THROW_IE = –1;
5.1 await()–堵塞等候方式
5.1.1 await的流程
await():当前缓存处于堵塞状况,直到调用signal()或中断才能被唤醒。
1)将当前缓存封装成node且等候状况为CONDITION。
3)加入到条件堆栈后,则堵塞当前缓存,等候被唤醒。
4)如果是因signal被唤醒,则节点会从条件堆栈转移到等候堆栈;如果是因中断被唤醒,则记录中断状况。两种情况都会跳出循环。
public final void await() throws InterruptedException {
//如果被中断,就处理中断异常
if (Thread.interrupted())
throw new InterruptedException();
//初始化链表的功能,增设当前缓存为链尾
Node node = addConditionWaiter();
//释放当前节点持有的所有资源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果当前缓存不在等候堆栈中,
//说明此时一定在条件堆栈里,将其堵塞。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//说明中断状况发生变化
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//当前缓存执行了signal方式会经过这个,也是重新将当前缓存加入并行堆栈中 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//删除条件堆栈中不满足要求的元素
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//处理被中断的情况
if (interruptMode != 0)
//这里是个难点,具体的同时实现我他们也有点不认知
//就把知道的都写出来
//如果是THROW_IE,说明signal之前发生中断
//如果是REINTERRUPT,signal之后中断, reportInterruptAfterWait(interruptMode);
}
addConditionWaiter():将当前缓存封装成节点,添加到条件堆栈尾部,并返回当前节点。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 判断队尾元素,如果非条件等候状况则清理出去
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//可能t之前引用的节点被删除了,所以要重新引用
t = lastWaiter;
}
//这个节点就则表示当前缓存
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//说明条件按堆栈中没有元素
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters():遍历一遍条件堆栈,删除非CONDITION状况的节点。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//记录在循环中上两个waitStatus有效的节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
//再次判断等候状况,保证节点都是CONDITION状况
//确保当前节点无效后删除引用
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
//否则就直接加到队尾的后面
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
//记录有效的节点并向后遍历
trail = t;
t = next;
}
}
5.1.2 await中有关中断的处理
透过对上面代码的观察,我们知道await()调用了checkInterruptWhileWaiting()。
有关中断这一块,我他们看的也比较迷,就把一些他们能认知的地方标注呵呵。
checkInterruptWhileWaiting():判断在堵塞过程中是否被中断。如果返回THROW_IE,则则表示缓存在调用signal()之前中断的;如果返回REINTERRUPT,则表明缓存在调用signal()之后中断;如果返回0则则表示没有被中断。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
transferAfterCancelledWait():缓存是否因为中断从park中唤醒。
final boolean transferAfterCancelledWait(Node node) {
//如果修正成功,暂且认为中断发生后,signal()被调用
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
//true则表示中断先于signal发生
return true;
}
//~~不认知~~
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
5.2.1 signal()–唤醒condition堆栈中的缓存
signal():唤醒两个被堵塞的缓存。
public final void signal() {
//判断当前缓存是否为资源的持有者
//这也是必须在lock()与unlock()代码中间执行的原因
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//开始唤醒条件堆栈的第两个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal():将条件堆栈的头节点从条件堆栈转移到等候堆栈,并且,将该节点从条件堆栈删除。
private void doSignal(Node first) {
do {
//先期的等候条件为空,说明condition堆栈中只有一个节点 if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//transferForSignal()是真正唤醒头节点的地方 } while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal():将节点放入等候堆栈并唤醒。并不需要在条件堆栈中移除,因为条件堆栈每次插入时都会把状况不为CONDITION的节点清理出去。
final boolean transferForSignal(Node node) {
//当前节点等候状况改变失败,则说明当前节点不是CONDITION //状况,那就不能进行接下来的操作方式,返回false
//0是正常状况
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//放入等候堆栈队尾中,并返回之前堆栈的前两个节点
Node p = enq(node);
int ws = p.waitStatus;
//如果节点没有被取消,或更改状况失败,则唤醒被堵塞的缓存 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
5.2.2 signalAll()–唤醒condition堆栈中所有缓存
signalAll()本质上还是调用了doSignalAll()
doSignalAll():遍历条件堆栈,插入到等候堆栈中。
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
5.3 awaitNanos()–超时机制
这里补充两个方式awaitNanos(),是我看堵塞堆栈源代码中遇到的。
awaitNanos():轮询检查缓存是否在并行缓存上,如果在则退出自旋。否则检查是否已超过解除挂起时间,如果超过,则退出挂起,否则继续挂起缓存到等候解除挂起。退出挂起之后,采用自旋的方式竞争锁。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
//采用自旋的方式检查是否已在等候堆栈当中 while (!isOnSyncQueue(node)) {
//如果挂起超过一定的时间,则退出
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//继续挂起缓存
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline – System.nanoTime();
}
//采用自旋的方式竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline – System.nanoTime();
}
总结
里面保护了两个堆栈,两个是等候堆栈(CLH),还有两个是条件堆栈(condition)。
release()释放资源,需要唤醒后继节点,判断后继节点是否满足情况。如果后继节点不为空且不是作废状况,则唤醒这个后继节点;否则从尾部往前面找适合的节点,找到则唤醒。
调用await(),缓存会进入条件堆栈,等候被唤醒
如有不当之处,欢迎指出~
如果喜欢我的该文,欢迎关注我的专栏~
参考该文:Javamammalian编程札记,AQS简单纯单过一遍,Condition源代码分析,JUC.Condition学习笔记[附详尽源代码解析]