概要
AQS 不间断为
AbstractQueuedSynchronizer , 在
java.util.concurrent.locks雇请的两个tcsh。类的具体内容促进作用和结构设计在已经开始类叙述重要信息里头就有较好的表达
Provides a framework forimplementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed tobe a useful basisfor most kinds of synchronizers that rely on a single atomic {@codeint} value to represent state. Subclasses must define theprotected methods that change thisstate, and which define what that state meansin terms of this object being acquired or released. Given these, the other methods in this class carry outall queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated {@code int} value manipulated using methods {@link #getState}, {@link #setState} and {@link #compareAndSetState} istracked with respect to synchronization. 具体内容翻译为: 提供了实现阻塞锁和相关的框架,依赖于的同步器(信号量、事件等)。 先进先出 (FIFO) 等待队列。 这个类被结构设计为对于大多数依赖于单个原子 {@codeint} 值来表示状态。 @code int}使用方法 {@link #getState}、{@link 操作的值#setState} 和 {@link #compareAndSetState} 被地跟踪、同步。简单叙述为:通过原子性的int值来标记同步状态,实现阻塞锁机制,基于FIFO队列来实现排队机制
AQS实现了两种模式,独占模式和共享模式,实现共享锁和排他锁
LockSupport工具类
LockSupport 官方是这样叙述的 Basic thread blocking primitives for creating locks and other , 翻译为 “基本线程阻塞原语,用于创建锁和其他”,可以理解为使用计算机原语来创建锁,底层实现为unsafe类,主要方法为pack(线程阻塞)与unpack(唤醒线程),源代码如下:
// LockSupport设置阻塞分为两组, // 一组是不设置blocker的方法,另一组是设置blocker方法 // JDK推荐为Thread设置blocker方便调试 public static void park(Object blocker) { Thread t = Thread.currentThread(); // blocker位线程内存当前偏移量,设置线程内存偏移量, setBlocker(t, blocker); // Unsafe雇请的方法可以直接操作内存,设置线程阻塞 U.park(false, 0L); // 线程唤醒后 blockers设置为空 setBlocker(t, null); } public static void park() { // 线程阻塞,单纯唤醒,不会记录偏移量 U.park(false, 0L); } // 与park基本相同,添加阻塞事件,自动唤醒 public static void parkNanos(Object blocker, long nanos); public static void parkUntil(Object blocker, long deadline);// unpark方法为unsafe.unpark方法,为C++实现,基于内存进行操作 public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); }AQS内部结构
AQS类图
AQS的实现类为NofairSync(非公平)、FairSync(公平)两种。
内部类Node
static final class Node { // 共享 static final Node SHARED = new Node(); // 独占 static finalNode EXCLUSIVE =null; // 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; static final int CANCELLED = 1; // 后继节点的线程状态处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点线程继续运行 static final int SIGNAL = –1; // 节点在等待队列中,节 static final intCONDITION = –2; static final int PROPAGATE = –3; // 等待状态 volatile int waitStatus; // 前驱结点 volatile Node prev; // 后继节点 volatile Node next; // Node封装当前线程 volatile Thread thread; // 指向ConditionObject的下两个节点 Node nextWaiter; }节点状态
CANCELLED (1):当前线程因为超时或者中断被取消。这是两个终结态,也就是状态到此为止SIGNAL (-1):当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的CONDITION (-2):当前线程在condition队列中PROPAGATE (-3):用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在两个节点成为头节点之前,是不会跃迁为此状态的主要属性
/** * 等待队列的头节点, 赖加载 * 除了初始化之外, 只能通过 setHead 方法来改变其值 * 如果 head 不为 null, waitStatus 值就一定不会是 CANCELLED */ private transient volatile Node head; /** * 等待队列的尾结点, 懒加载 * 只能通过 enq 方法添加新节点时才会去改变尾结点 */ private transient volatile Node tail; private volatile int state;AQS属性结构结构设计如下
具体内容方法
开放实现接口API,用于场景自定义:
方法
促进作用
boolean tryAcquire(int arg)
boolean tryRelease(int arg)
试释放独占锁
int tryAcquireShared(int arg)
boolean tryReleaseShared(int arg)
试释放共享锁
boolean isHeldExclusively()
当前线程是否获得了独占锁
AQS本身将同步状态的管理用模板方法模式都封装好了,以下列举了AQS中的一些模板方法
方法
叙述
void acquire(int arg)
tryAcquire
void acquireInterruptibly(int arg)
响应中断版本的 acquire
boolean tryAcquireNanos(int arg,long nanos)
响应中断+带超时版本的 acquire
void acquireShared(int arg)
tryAcquireShared 方法
void acquireSharedInterruptibly(int arg)
响应中断版本的 acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos)
响应中断+带超时版本的 acquireShared
boolean release(int arg)
释放独占锁
boolean releaseShared(int arg)
释放共享锁
Collection getQueuedThreads()
源代码层面上会对acquire、release、acquireShared 、releaseShared 进行详解
独占锁
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();// 线程中断 } private Node addWaiter(Node mode) { //创建Node节点Node node =new Node(mode); for (;;) { // 当前队尾进行标记 Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail);// CAS操作放到队尾 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; returnnode; } }else { initializeSyncQueue(); } } } // 将该线程加入等待队列的尾部,并标记为独占模式 final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for(;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 成功之后将节点设置为新的头部节点 setHead(node); p.next = null; // help GC return interrupted; } // 判断线程是否需要中断, 判断标准是前节点状态 if (shouldParkAfterFailedAcquire(p, node)) //挂起当前线程interrupted |= parkAndCheckInterrupt(); } }catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throwt; } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点状态 int ws = pred.waitStatus; // singal 表示前驱节点释放后会唤起当前线程,会继续等待,那么线程可以中断等待 if (ws == Node.SIGNAL) return true; // 前驱节点不是Head节点,并且状态为CANCELLED,那么就需要剔除前置节点,向前遍历,直到找到合适的前置 if (ws > 0) {do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { pred.compareAndSetWaitStatus(ws, Node.SIGNAL); }return false; }类来实现,具体内容分析会在ReentrantLock中进行详细代码分析。addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式补上// 独占锁释放节点 public final boolean release(int arg) { // tryRelease由实现类具体内容实现 if(tryRelease(arg)) { Node h = head;if (h != null && h.waitStatus != 0) // 唤醒下两个节点 unparkSuccessor(h); return true; } return false; }release方法通过tryRelease()实现了扩展性,来进行锁的重入结构设计,unparkSuccessor则自身实现,完成了下两个Node节点的唤醒
共享锁
public final void acquireShared(int arg) { // tryAcquireShared为抽象方法,由子类实现 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next =null; // help GC return; } } // 上面由详细解读 if(shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } }catch(Throwable t) { cancelAcquire(node);throw t; } finally { if(interrupted) selfInterrupt(); } }// 释放共享锁 public final boolean releaseShared(int arg) { // 子类实现 if(tryReleaseShared(arg)) { doReleaseShared();return true; } return false; } //当前节点调用 private void doReleaseShared() { for(;;) { Node h = head;if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点状态为SIGNAL, 表示可以去释放锁 if(ws == Node.SIGNAL) {// 通过 cas 将 waitStatus 设为 0 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 线程唤醒 unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }关于独占锁和共享锁的异同
相同
不同
1)独占锁只会释放头部后节点的线程,而共享锁会依次释放所有线程
2)独占锁存在非公平锁的情况,新的线程可能抢占队列中线程的锁,共享锁则不存在这种情况
扩展
AQS使用了模板方法的结构设计模式,以固定的方法进行调用,子类对其实现,方便扩展,后续会对具体内容实现类继续读源代码。
原文
https://www.cnblogs.com/Tonyzczc/p/16179428.html